Gevent:如何等待一组 greenlets 完成
Gevent: how to wait for set of greenlets finished
我有一个gevent.pool(固定大小)在几个任务生产者之间共享。如果有空闲插槽,每个任务生产者都可以将新的 greenlet 应用到池中。将任务添加到池中后,任务生产者应等待所有添加的任务完成。
我尝试使用gevent.queue.JoinableQueue等待所有任务完成。它有效,除了我在等待结束时遇到一个非常烦人的异常。
我该如何修正下面的代码以避免这种情况发生?
也许我做错了什么?
from gevent import monkey, sleep; monkey.patch_all()
from gevent.queue import JoinableQueue
from gevent.pool import Pool
pool = Pool(3)
def worker(n):
print 'Worker {} started'.format(n)
sleep(1)
print 'Worker {} finished'.format(n)
return n
def main():
results = []
queue = JoinableQueue()
for job_no in range(5):
pool.wait_available()
greenlet = pool.apply_async(worker, kwds=dict(n=job_no), callback=lambda ret: results.append(ret))
queue.put(greenlet)
sleep(.05)
print 'All workers added'
queue.join()
print 'All workers finished', results
if __name__ == '__main__':
main()
输出:
Worker 0 started
Worker 1 started
Worker 2 started
Worker 0 finished
Worker 3 started
Worker 1 finished
Worker 4 started
All workers added
Worker 2 finished
Worker 3 finished
Worker 4 finished
Traceback (most recent call last):
File "main.py", line 32, in <module>
main()
File "main.py", line 27, in main
queue.join()
File "C:\Python.7.10\x64\lib\site-packages\gevent\queue.py", line 492, in join
return self._cond.wait(timeout=timeout)
File "C:\Python.7.10\x64\lib\site-packages\gevent\event.py", line 219, in wait
return self._wait(timeout)
File "C:\Python.7.10\x64\lib\site-packages\gevent\event.py", line 129, in _wait
gotit = self._wait_core(timeout)
File "C:\Python.7.10\x64\lib\site-packages\gevent\event.py", line 106, in _wait_core
result = self.hub.switch()
File "C:\Python.7.10\x64\lib\site-packages\gevent\hub.py", line 630, in switch
return RawGreenlet.switch(self)
gevent.hub.LoopExit: ('This operation would block forever', <Hub at 0x26c1c28 select default pending=0 ref=0>)
你得到 'This operation would block forever' 错误,因为队列中没有 greenlet 可以使用任务,queue.join()
只是阻塞,直到所有 greenlet 完成,然后引发异常。
这里不需要JoinableQueue
,使用gevent.joinall()
等待所有greenlets完成:
import gevent
def main():
results = []
gs = []
for job_no in range(5):
greenlet = ..
gs.append(greenlet)
gevent.joinall(gs)
print 'All workers finished', results
我有一个gevent.pool(固定大小)在几个任务生产者之间共享。如果有空闲插槽,每个任务生产者都可以将新的 greenlet 应用到池中。将任务添加到池中后,任务生产者应等待所有添加的任务完成。
我尝试使用gevent.queue.JoinableQueue等待所有任务完成。它有效,除了我在等待结束时遇到一个非常烦人的异常。
我该如何修正下面的代码以避免这种情况发生? 也许我做错了什么?
from gevent import monkey, sleep; monkey.patch_all()
from gevent.queue import JoinableQueue
from gevent.pool import Pool
pool = Pool(3)
def worker(n):
print 'Worker {} started'.format(n)
sleep(1)
print 'Worker {} finished'.format(n)
return n
def main():
results = []
queue = JoinableQueue()
for job_no in range(5):
pool.wait_available()
greenlet = pool.apply_async(worker, kwds=dict(n=job_no), callback=lambda ret: results.append(ret))
queue.put(greenlet)
sleep(.05)
print 'All workers added'
queue.join()
print 'All workers finished', results
if __name__ == '__main__':
main()
输出:
Worker 0 started
Worker 1 started
Worker 2 started
Worker 0 finished
Worker 3 started
Worker 1 finished
Worker 4 started
All workers added
Worker 2 finished
Worker 3 finished
Worker 4 finished
Traceback (most recent call last):
File "main.py", line 32, in <module>
main()
File "main.py", line 27, in main
queue.join()
File "C:\Python.7.10\x64\lib\site-packages\gevent\queue.py", line 492, in join
return self._cond.wait(timeout=timeout)
File "C:\Python.7.10\x64\lib\site-packages\gevent\event.py", line 219, in wait
return self._wait(timeout)
File "C:\Python.7.10\x64\lib\site-packages\gevent\event.py", line 129, in _wait
gotit = self._wait_core(timeout)
File "C:\Python.7.10\x64\lib\site-packages\gevent\event.py", line 106, in _wait_core
result = self.hub.switch()
File "C:\Python.7.10\x64\lib\site-packages\gevent\hub.py", line 630, in switch
return RawGreenlet.switch(self)
gevent.hub.LoopExit: ('This operation would block forever', <Hub at 0x26c1c28 select default pending=0 ref=0>)
你得到 'This operation would block forever' 错误,因为队列中没有 greenlet 可以使用任务,queue.join()
只是阻塞,直到所有 greenlet 完成,然后引发异常。
JoinableQueue
,使用gevent.joinall()
等待所有greenlets完成:
import gevent
def main():
results = []
gs = []
for job_no in range(5):
greenlet = ..
gs.append(greenlet)
gevent.joinall(gs)
print 'All workers finished', results