一次处理 N 个作业的 Asyncio worker?
Asyncio worker that handles N jobs at a time?
我正在尝试创建一个 asyncio
worker class,它将使用作业队列中的作业并并行处理多达 N
个作业。某些作业可能会将其他作业排队。当作业队列为空并且 worker 完成所有当前作业时,它应该结束。
我仍在为 asyncio
的概念而苦苦挣扎。这是我的尝试之一,其中 N=3
:
import asyncio, logging, random
async def do_work(id_):
await asyncio.sleep(random.random())
return id_
class JobQueue:
''' Maintains a list of all pendings jobs. '''
def __init__(self):
self._queue = asyncio.Queue()
self._max_id = 10
for id_ in range(self._max_id):
self._queue.put_nowait(id_ + 1)
def add_job(self):
self._max_id += 1
self._queue.put_nowait(self._max_id)
async def get_job(self):
return await self._queue.get()
def has_jobs(self):
return self._queue.qsize() > 0
class JobWorker:
''' Processes up to 3 jobs at a time in parallel. '''
def __init__(self, job_queue):
self._current_jobs = set()
self._job_queue = job_queue
self._semaphore = asyncio.Semaphore(3)
async def run(self):
while self._job_queue.has_jobs() or len(self._current_jobs) > 0:
print('Acquiring semaphore...')
await self._semaphore.acquire()
print('Getting a job...')
job_id = await self._job_queue.get_job()
print('Scheduling job {}'.format(job_id))
self._current_jobs.add(job_id)
task = asyncio.Task(do_work(job_id))
task.add_done_callback(self.task_finished)
def task_finished(self, task):
job_id = task.result()
print('Finished job {} / released semaphore'.format(job_id))
self._current_jobs.remove(job_id)
self._semaphore.release()
if random.random() < 0.2:
print('Queuing a new job')
self._job_queue.add_job()
loop = asyncio.get_event_loop()
jw = JobWorker(JobQueue())
print('Starting event loop')
loop.run_until_complete(jw.run())
print('Event loop ended')
loop.close()
输出的摘录:
Starting event loop
Acquiring semaphore...
Getting a job...
Scheduling job 1
Acquiring semaphore...
Getting a job...
Scheduling job 2
Acquiring semaphore...
Getting a job...
Scheduling job 3
Acquiring semaphore...
Finished job 2 / released semaphore
Getting a job...
Scheduling job 4
...snip...
Acquiring semaphore...
Finished job 11 / released semaphore
Getting a job...
Finished job 12 / released semaphore
Finished job 13 / released semaphore
似乎可以正确处理所有作业,同时一次处理的作业不超过 3 个。但是,程序在最后一个作业完成后挂起。如输出所示,它似乎挂在 job_id = await self._job_queue.get_job()
。一旦作业队列为空,此协程将永远不会恢复,并且不会再次检查作业队列是否为空(在循环顶部)。
我已经尝试通过多种方式解决这个问题,但从概念上讲有些东西不太合适。我当前的 WIP 在队列和工作人员之间传递一些未来,然后在所有这些上使用 asyncio.wait(...)
的某种组合,但它变得丑陋,我想知道是否有一个我忽略的优雅解决方案。
您可以使用简单的 asyncio.wait_for
超时 get_job
。例如 1s,并在超时时回到循环的开头。
async def run(self):
while self._job_queue.has_jobs() or len(self._current_jobs) > 0:
print('Acquiring semaphore...')
await self._semaphore.acquire()
print('Getting a job...')
try:
job_id = await asyncio.wait_for(self._job_queue.get_job(), 1)
except asyncio.TimeoutError:
continue
print('Scheduling job {}'.format(job_id))
self._current_jobs.add(job_id)
task = asyncio.Task(do_work(job_id))
task.add_done_callback(self.task_finished)
您可以利用 queue.task_done that indicates that a formerly enqueued task is complete. Then you can combine queue.join and queue.get using asyncio.wait:如果 queue.join
完成而 queue.get
没有完成,这意味着所有作业都已完成。
看这个例子:
class Worker:
def __init__(self, func, n=3):
self.func = func
self.queue = asyncio.Queue()
self.semaphore = asyncio.Semaphore(n)
def put(self, *args):
self.queue.put_nowait(args)
async def run(self):
while True:
args = await self._get()
if args is None:
return
asyncio.ensure_future(self._target(args))
async def _get(self):
get_task = asyncio.ensure_future(self.queue.get())
join_task = asyncio.ensure_future(self.queue.join())
await asyncio.wait([get_task, join_task], return_when='FIRST_COMPLETED')
if get_task.done():
return task.result()
async def _target(self, args):
try:
async with self.semaphore:
return await self.func(*args)
finally:
self.queue.task_done()
我正在尝试创建一个 asyncio
worker class,它将使用作业队列中的作业并并行处理多达 N
个作业。某些作业可能会将其他作业排队。当作业队列为空并且 worker 完成所有当前作业时,它应该结束。
我仍在为 asyncio
的概念而苦苦挣扎。这是我的尝试之一,其中 N=3
:
import asyncio, logging, random
async def do_work(id_):
await asyncio.sleep(random.random())
return id_
class JobQueue:
''' Maintains a list of all pendings jobs. '''
def __init__(self):
self._queue = asyncio.Queue()
self._max_id = 10
for id_ in range(self._max_id):
self._queue.put_nowait(id_ + 1)
def add_job(self):
self._max_id += 1
self._queue.put_nowait(self._max_id)
async def get_job(self):
return await self._queue.get()
def has_jobs(self):
return self._queue.qsize() > 0
class JobWorker:
''' Processes up to 3 jobs at a time in parallel. '''
def __init__(self, job_queue):
self._current_jobs = set()
self._job_queue = job_queue
self._semaphore = asyncio.Semaphore(3)
async def run(self):
while self._job_queue.has_jobs() or len(self._current_jobs) > 0:
print('Acquiring semaphore...')
await self._semaphore.acquire()
print('Getting a job...')
job_id = await self._job_queue.get_job()
print('Scheduling job {}'.format(job_id))
self._current_jobs.add(job_id)
task = asyncio.Task(do_work(job_id))
task.add_done_callback(self.task_finished)
def task_finished(self, task):
job_id = task.result()
print('Finished job {} / released semaphore'.format(job_id))
self._current_jobs.remove(job_id)
self._semaphore.release()
if random.random() < 0.2:
print('Queuing a new job')
self._job_queue.add_job()
loop = asyncio.get_event_loop()
jw = JobWorker(JobQueue())
print('Starting event loop')
loop.run_until_complete(jw.run())
print('Event loop ended')
loop.close()
输出的摘录:
Starting event loop
Acquiring semaphore...
Getting a job...
Scheduling job 1
Acquiring semaphore...
Getting a job...
Scheduling job 2
Acquiring semaphore...
Getting a job...
Scheduling job 3
Acquiring semaphore...
Finished job 2 / released semaphore
Getting a job...
Scheduling job 4
...snip...
Acquiring semaphore...
Finished job 11 / released semaphore
Getting a job...
Finished job 12 / released semaphore
Finished job 13 / released semaphore
似乎可以正确处理所有作业,同时一次处理的作业不超过 3 个。但是,程序在最后一个作业完成后挂起。如输出所示,它似乎挂在 job_id = await self._job_queue.get_job()
。一旦作业队列为空,此协程将永远不会恢复,并且不会再次检查作业队列是否为空(在循环顶部)。
我已经尝试通过多种方式解决这个问题,但从概念上讲有些东西不太合适。我当前的 WIP 在队列和工作人员之间传递一些未来,然后在所有这些上使用 asyncio.wait(...)
的某种组合,但它变得丑陋,我想知道是否有一个我忽略的优雅解决方案。
您可以使用简单的 asyncio.wait_for
超时 get_job
。例如 1s,并在超时时回到循环的开头。
async def run(self):
while self._job_queue.has_jobs() or len(self._current_jobs) > 0:
print('Acquiring semaphore...')
await self._semaphore.acquire()
print('Getting a job...')
try:
job_id = await asyncio.wait_for(self._job_queue.get_job(), 1)
except asyncio.TimeoutError:
continue
print('Scheduling job {}'.format(job_id))
self._current_jobs.add(job_id)
task = asyncio.Task(do_work(job_id))
task.add_done_callback(self.task_finished)
您可以利用 queue.task_done that indicates that a formerly enqueued task is complete. Then you can combine queue.join and queue.get using asyncio.wait:如果 queue.join
完成而 queue.get
没有完成,这意味着所有作业都已完成。
看这个例子:
class Worker:
def __init__(self, func, n=3):
self.func = func
self.queue = asyncio.Queue()
self.semaphore = asyncio.Semaphore(n)
def put(self, *args):
self.queue.put_nowait(args)
async def run(self):
while True:
args = await self._get()
if args is None:
return
asyncio.ensure_future(self._target(args))
async def _get(self):
get_task = asyncio.ensure_future(self.queue.get())
join_task = asyncio.ensure_future(self.queue.join())
await asyncio.wait([get_task, join_task], return_when='FIRST_COMPLETED')
if get_task.done():
return task.result()
async def _target(self, args):
try:
async with self.semaphore:
return await self.func(*args)
finally:
self.queue.task_done()