一次处理 N 个作业的 Asyncio worker?

Asyncio worker that handles N jobs at a time?

我正在尝试创建一个 asyncio worker class,它将使用作业队列中的作业并并行处理多达 N 个作业。某些作业可能会将其他作业排队。当作业队列为空并且 worker 完成所有当前作业时,它应该结束。

我仍在为 asyncio 的概念而苦苦挣扎。这是我的尝试之一,其中 N=3:

import asyncio, logging, random

async def do_work(id_):
    await asyncio.sleep(random.random())
    return id_

class JobQueue:
    ''' Maintains a list of all pendings jobs. '''
    def __init__(self):
        self._queue = asyncio.Queue()
        self._max_id = 10
        for id_ in range(self._max_id):
            self._queue.put_nowait(id_ + 1)

    def add_job(self):
        self._max_id += 1
        self._queue.put_nowait(self._max_id)

    async def get_job(self):
        return await self._queue.get()

    def has_jobs(self):
        return self._queue.qsize() > 0

class JobWorker:
    ''' Processes up to 3 jobs at a time in parallel. '''
    def __init__(self, job_queue):
        self._current_jobs = set()
        self._job_queue = job_queue
        self._semaphore = asyncio.Semaphore(3)

    async def run(self):
        while self._job_queue.has_jobs() or len(self._current_jobs) > 0:
            print('Acquiring semaphore...')
            await self._semaphore.acquire()
            print('Getting a job...')
            job_id = await self._job_queue.get_job()
            print('Scheduling job {}'.format(job_id))
            self._current_jobs.add(job_id)
            task = asyncio.Task(do_work(job_id))
            task.add_done_callback(self.task_finished)

    def task_finished(self, task):
        job_id = task.result()
        print('Finished job {} / released semaphore'.format(job_id))
        self._current_jobs.remove(job_id)
        self._semaphore.release()
        if random.random() < 0.2:
            print('Queuing a new job')
            self._job_queue.add_job()

loop = asyncio.get_event_loop()
jw = JobWorker(JobQueue())
print('Starting event loop')
loop.run_until_complete(jw.run())
print('Event loop ended')
loop.close()

输出的摘录:

Starting event loop
Acquiring semaphore...
Getting a job...
Scheduling job 1
Acquiring semaphore...
Getting a job...
Scheduling job 2
Acquiring semaphore...
Getting a job...
Scheduling job 3
Acquiring semaphore...
Finished job 2 / released semaphore
Getting a job...
Scheduling job 4
...snip...
Acquiring semaphore...
Finished job 11 / released semaphore
Getting a job...
Finished job 12 / released semaphore
Finished job 13 / released semaphore

似乎可以正确处理所有作业,同时一次处理的作业不超过 3 个。但是,程序在最后一个作业完成后挂起。如输出所示,它似乎挂在 job_id = await self._job_queue.get_job()。一旦作业队列为空,此协程将永远不会恢复,并且不会再次检查作业队列是否为空(在循环顶部)。

我已经尝试通过多种方式解决这个问题,但从概念上讲有些东西不太合适。我当前的 WIP 在队列和工作人员之间传递一些未来,然后在所有这些上使用 asyncio.wait(...) 的某种组合,但它变得丑陋,我想知道是否有一个我忽略的优雅解决方案。

您可以使用简单的 asyncio.wait_for 超时 get_job。例如 1s,并在超时时回到循环的开头。

    async def run(self):
        while self._job_queue.has_jobs() or len(self._current_jobs) > 0:
            print('Acquiring semaphore...')
            await self._semaphore.acquire()
            print('Getting a job...')
            try:
                job_id = await asyncio.wait_for(self._job_queue.get_job(), 1)
            except asyncio.TimeoutError:
                continue
            print('Scheduling job {}'.format(job_id))
            self._current_jobs.add(job_id)
            task = asyncio.Task(do_work(job_id))
            task.add_done_callback(self.task_finished)

您可以利用 queue.task_done that indicates that a formerly enqueued task is complete. Then you can combine queue.join and queue.get using asyncio.wait:如果 queue.join 完成而 queue.get 没有完成,这意味着所有作业都已完成。

看这个例子:

class Worker:

    def __init__(self, func, n=3):
        self.func = func
        self.queue = asyncio.Queue()
        self.semaphore = asyncio.Semaphore(n)

    def put(self, *args):
        self.queue.put_nowait(args)

    async def run(self):
        while True:
            args = await self._get()
            if args is None:
                return
            asyncio.ensure_future(self._target(args))

    async def _get(self):
        get_task = asyncio.ensure_future(self.queue.get())
        join_task = asyncio.ensure_future(self.queue.join())
        await asyncio.wait([get_task, join_task], return_when='FIRST_COMPLETED')
        if get_task.done():
            return task.result()

    async def _target(self, args):
        try:
            async with self.semaphore:
                return await self.func(*args)
        finally:
            self.queue.task_done()