Python:使用multiprocessing.map时检查已处理元素的数量
Python: check count of processed elements when using multiprocessing.map
我正在使用 multiprocessing
模块进行并行 url 检索。我的代码是这样的:
pat = re.compile("(?P<url>https?://[^\s]+)")
def resolve_url(text):
missing = 0
bad = 0
url = 'before'
long_url = 'after'
match = pat.search(text) ## a text looks like "I am at here. http:.....(a URL)"
if not match:
missing = 1
else:
url = match.group("url")
try:
long_url = urllib2.urlopen(url).url
except:
bad = 1
return (url, long_url, missing, bad)
if __name__ == '__main__':
pool = multiprocessing.Pool(100)
resolved_urls = pool.map(resolve_url, checkin5) ## checkin5 is a list of texts
问题是,我的 checkin5
列表包含大约 600,000 个元素,这项并行工作确实需要时间。我想检查流程中有多少元素已被解决。如果在一个简单的 for 循环中,我可以这样做:
resolved_urls = []
now = time.time()
for i, element in enumerate(checkin5):
resolved_urls.append(resolve_url(element))
if i%1000 == 0:
print("from %d to %d: %2.5f seconds" %(i-1000, i, time.time()-now))
now = time.time()
但现在我需要提高效率,所以多进程是必要的,但我不知道在这种情况下如何检查进程,有想法吗?
顺便说一句,为了检查上述方法是否也适用于这种情况,我尝试了一个玩具代码:
import multiprocessing
import time
def cal(x):
res = x*x
return res
if __name__ == '__main__':
pool = multiprocessing.Pool(4)
t0 = time.time()
result_list = pool.map(cal,range(1000000))
print(time.time()-t0)
t0 = time.time()
for i, result in enumerate(pool.map(cal, range(1000000))):
if i%100000 == 0:
print("%d elements have been calculated, %2.5f" %(i, time.time()-t0))
t0 = time.time()
结果是:
0.465271949768
0 elements have been calculated, 0.45459
100000 elements have been calculated, 0.02211
200000 elements have been calculated, 0.02142
300000 elements have been calculated, 0.02118
400000 elements have been calculated, 0.01068
500000 elements have been calculated, 0.01038
600000 elements have been calculated, 0.01391
700000 elements have been calculated, 0.01174
800000 elements have been calculated, 0.01098
900000 elements have been calculated, 0.01319
从结果来看,我认为单进程的方法在这里行不通。好像会先调用pool.map
然后在计算完成并得到完整列表后,然后enumerate
开始....我是吗对吗?
您应该可以使用 Pool.imap
或 Pool.imap_unordered
来执行此操作,具体取决于您是否关心结果的顺序。它们都是非阻塞的...
resolved_urls = []
pool = multiprocessing.Pool(100)
res = pool.imap(resolve_url, checkin5)
for x in res:
resolved_urls.append(x)
print 'finished one'
# ... whatever counting/tracking code you want here
首先,我相信@danf1024 有答案。这是为了解决从 pool.map
切换到 pool.imap
时速度变慢的问题。
这里有一个小实验:
from multiprocessing import Pool
def square(x):
return x * x
N = 10 ** 4
l = list(range(N))
def test_map(n=N):
list(Pool().map(square, l))
# In [3]: %timeit -n10 q.test_map()
# 10 loops, best of 3: 14.2 ms per loop
def test_imap(n=N):
list(Pool().imap(square, l))
# In [4]: %timeit -n10 q.test_imap()
# 10 loops, best of 3: 232 ms per loop
def test_imap1(n=N):
list(Pool(processes=1).imap(square, l))
# In [5]: %timeit -n10 q.test_imap1()
# 10 loops, best of 3: 191 ms per loop
def test_map_naive(n=N):
# cast map to list in python3
list(map(square, l))
# In [6]: %timeit -n10 q.test_map_naive()
# 10 loops, best of 3: 1.2 ms per loop
因为与下载和解析网页等相比,平方是一种成本较低的操作,如果每个处理器都可以处理大量不间断的输入块,则并行化将会有所收获。 imap
不是这种情况,它在我的 4 核上表现很差。有趣的是,将进程数限制为 1 会使 imap
运行得更快,因为竞争条件已被移除。
但是,当您转向成本更高的操作时,imap
和 map
之间的差异变得越来越不明显。
我正在使用 multiprocessing
模块进行并行 url 检索。我的代码是这样的:
pat = re.compile("(?P<url>https?://[^\s]+)")
def resolve_url(text):
missing = 0
bad = 0
url = 'before'
long_url = 'after'
match = pat.search(text) ## a text looks like "I am at here. http:.....(a URL)"
if not match:
missing = 1
else:
url = match.group("url")
try:
long_url = urllib2.urlopen(url).url
except:
bad = 1
return (url, long_url, missing, bad)
if __name__ == '__main__':
pool = multiprocessing.Pool(100)
resolved_urls = pool.map(resolve_url, checkin5) ## checkin5 is a list of texts
问题是,我的 checkin5
列表包含大约 600,000 个元素,这项并行工作确实需要时间。我想检查流程中有多少元素已被解决。如果在一个简单的 for 循环中,我可以这样做:
resolved_urls = []
now = time.time()
for i, element in enumerate(checkin5):
resolved_urls.append(resolve_url(element))
if i%1000 == 0:
print("from %d to %d: %2.5f seconds" %(i-1000, i, time.time()-now))
now = time.time()
但现在我需要提高效率,所以多进程是必要的,但我不知道在这种情况下如何检查进程,有想法吗?
顺便说一句,为了检查上述方法是否也适用于这种情况,我尝试了一个玩具代码:
import multiprocessing
import time
def cal(x):
res = x*x
return res
if __name__ == '__main__':
pool = multiprocessing.Pool(4)
t0 = time.time()
result_list = pool.map(cal,range(1000000))
print(time.time()-t0)
t0 = time.time()
for i, result in enumerate(pool.map(cal, range(1000000))):
if i%100000 == 0:
print("%d elements have been calculated, %2.5f" %(i, time.time()-t0))
t0 = time.time()
结果是:
0.465271949768
0 elements have been calculated, 0.45459
100000 elements have been calculated, 0.02211
200000 elements have been calculated, 0.02142
300000 elements have been calculated, 0.02118
400000 elements have been calculated, 0.01068
500000 elements have been calculated, 0.01038
600000 elements have been calculated, 0.01391
700000 elements have been calculated, 0.01174
800000 elements have been calculated, 0.01098
900000 elements have been calculated, 0.01319
从结果来看,我认为单进程的方法在这里行不通。好像会先调用pool.map
然后在计算完成并得到完整列表后,然后enumerate
开始....我是吗对吗?
您应该可以使用 Pool.imap
或 Pool.imap_unordered
来执行此操作,具体取决于您是否关心结果的顺序。它们都是非阻塞的...
resolved_urls = []
pool = multiprocessing.Pool(100)
res = pool.imap(resolve_url, checkin5)
for x in res:
resolved_urls.append(x)
print 'finished one'
# ... whatever counting/tracking code you want here
首先,我相信@danf1024 有答案。这是为了解决从 pool.map
切换到 pool.imap
时速度变慢的问题。
这里有一个小实验:
from multiprocessing import Pool
def square(x):
return x * x
N = 10 ** 4
l = list(range(N))
def test_map(n=N):
list(Pool().map(square, l))
# In [3]: %timeit -n10 q.test_map()
# 10 loops, best of 3: 14.2 ms per loop
def test_imap(n=N):
list(Pool().imap(square, l))
# In [4]: %timeit -n10 q.test_imap()
# 10 loops, best of 3: 232 ms per loop
def test_imap1(n=N):
list(Pool(processes=1).imap(square, l))
# In [5]: %timeit -n10 q.test_imap1()
# 10 loops, best of 3: 191 ms per loop
def test_map_naive(n=N):
# cast map to list in python3
list(map(square, l))
# In [6]: %timeit -n10 q.test_map_naive()
# 10 loops, best of 3: 1.2 ms per loop
因为与下载和解析网页等相比,平方是一种成本较低的操作,如果每个处理器都可以处理大量不间断的输入块,则并行化将会有所收获。 imap
不是这种情况,它在我的 4 核上表现很差。有趣的是,将进程数限制为 1 会使 imap
运行得更快,因为竞争条件已被移除。
但是,当您转向成本更高的操作时,imap
和 map
之间的差异变得越来越不明显。