我可以使用池回调实现多处理计数器吗?
Can I implement a counter for multiprocessing using pool callback?
我在谷歌上搜索了一下如何正确构建计数器以跟踪已完成工作的进度。到目前为止,似乎所有答案都涉及 lock
和 Value
.
的使用
我想知道我是否可以使用回调来实现它。似乎回调是在主进程中执行的,而不是工作人员所在的子进程。我可以进一步假设它是在同一个线程中执行的,因此根本没有竞争条件吗?
import time
import multiprocessing
import os
Pool = multiprocessing.Pool
def sqr(a):
time.sleep(0.5)
print 'local {}'.format(os.getpid())
return a * a
pool = Pool(processes=4)
class Counter(object):
def __init__(self):
self.value = 0
def incr(self, x):
self.value += 1
print 'count {}'.format(self.value)
print 'callback {}'.format(os.getpid())
counter = Counter()
r = [pool.apply_async(sqr, (x,), callback=counter.incr) for x in range(10)]
pool.close()
pool.join()
local 27155local 27154local 27156
count 1
callback 27152
count 2
callback 27152
count 3
callback 27152
local 27153
count 4
callback 27152
local 27155
count 5
callback 27152
local 27156
local 27154
count 6
callback 27152
count 7
callback 27152
local 27153
count 8
callback 27152
local 27155
count 9
callback 27152
local 27156
count 10
callback 27152
main 27152
main count 10
Process finished with exit code 0
更新
好的,看来这 link 解释了回调背后的一些机制。
所以实际上它在主进程中的不同线程上运行。
但是,我仍然可以用同样的方式实现计数器吗,因为只有 1 个线程修改计数器?
或者您可以使用 imap_unordered 在循环中计数,如 bj0 所述:
results = []
for count, result in enumerate(pool.imap_unordered(sqr, range(10)), 1):
results.append(result)
print(count)
就我个人而言,我发现处理 imap_unordered()
返回的原始结果比 apply_async()
返回的结果对象更直接。
从@ami-tavory 评论中的 SO link 来看,回调似乎都可能在同一个线程上调用。由于这在文档或 api 中未指定,因此我不会依赖它,因为它可能会在未来发生变化或取决于实现。
Python 没有原子增量(某些 itertools trick that relies on the GIL 除外),因此要确保线程安全,您必须使用锁或其他某种形式的同步。你为什么要避免它?它可以用作上下文管理器,使代码非常精简:
from threading import Lock
class Counter(object):
def __init__(self):
self.value = 0
self.lock = Lock()
def incr(self, x):
with self.lock:
self.value += 1
另一种方法是使用 imap_unordered
,在结果可用时循环处理结果(在主线程中)并在那里更新您的 progress/counter。
我在谷歌上搜索了一下如何正确构建计数器以跟踪已完成工作的进度。到目前为止,似乎所有答案都涉及 lock
和 Value
.
我想知道我是否可以使用回调来实现它。似乎回调是在主进程中执行的,而不是工作人员所在的子进程。我可以进一步假设它是在同一个线程中执行的,因此根本没有竞争条件吗?
import time
import multiprocessing
import os
Pool = multiprocessing.Pool
def sqr(a):
time.sleep(0.5)
print 'local {}'.format(os.getpid())
return a * a
pool = Pool(processes=4)
class Counter(object):
def __init__(self):
self.value = 0
def incr(self, x):
self.value += 1
print 'count {}'.format(self.value)
print 'callback {}'.format(os.getpid())
counter = Counter()
r = [pool.apply_async(sqr, (x,), callback=counter.incr) for x in range(10)]
pool.close()
pool.join()
local 27155local 27154local 27156
count 1
callback 27152
count 2
callback 27152
count 3
callback 27152
local 27153
count 4
callback 27152
local 27155
count 5
callback 27152
local 27156
local 27154
count 6
callback 27152
count 7
callback 27152
local 27153
count 8
callback 27152
local 27155
count 9
callback 27152
local 27156
count 10
callback 27152
main 27152
main count 10
Process finished with exit code 0
更新
好的,看来这 link 解释了回调背后的一些机制。
所以实际上它在主进程中的不同线程上运行。
但是,我仍然可以用同样的方式实现计数器吗,因为只有 1 个线程修改计数器?
或者您可以使用 imap_unordered 在循环中计数,如 bj0 所述:
results = []
for count, result in enumerate(pool.imap_unordered(sqr, range(10)), 1):
results.append(result)
print(count)
就我个人而言,我发现处理 imap_unordered()
返回的原始结果比 apply_async()
返回的结果对象更直接。
从@ami-tavory 评论中的 SO link 来看,回调似乎都可能在同一个线程上调用。由于这在文档或 api 中未指定,因此我不会依赖它,因为它可能会在未来发生变化或取决于实现。
Python 没有原子增量(某些 itertools trick that relies on the GIL 除外),因此要确保线程安全,您必须使用锁或其他某种形式的同步。你为什么要避免它?它可以用作上下文管理器,使代码非常精简:
from threading import Lock
class Counter(object):
def __init__(self):
self.value = 0
self.lock = Lock()
def incr(self, x):
with self.lock:
self.value += 1
另一种方法是使用 imap_unordered
,在结果可用时循环处理结果(在主线程中)并在那里更新您的 progress/counter。