在后台线程中同时限制 运行 个带有信号量的异步协程

Limiting simultaneously running asyncio coroutines with semaphores in a background thread

作为 Python 的新 asyncio 模块的实验,我创建了以下代码片段来在后台工作程序中处理一组长 运行 操作(作业)。

为了控制同时 运行 作业的数量,我在 with 块 中引入了一个信号量(第 56 行)。但是,有了信号量,似乎永远不会释放获取的锁,因为完成后(执行回调)等待的作业不会启动。当我放弃 with block 时,一切都按预期工作。

import asyncio

from queue import Queue, Empty
from datetime import datetime
from threading import Thread


class BackgroundWorker(Thread):
    def __init__(self):
        super().__init__()
        self._keep_running = True
        self._waiting_coros = Queue()
        self._tasks = []
        self._loop = None    # Loop must be initialized in child thread.
        self.limit_simultaneous_processes = asyncio.Semaphore(2)

    def stop(self):
        self._keep_running = False

    def run(self):
        self._loop = asyncio.new_event_loop()       # Implicit creation of the loop only happens in the main thread.
        asyncio.set_event_loop(self._loop)          # Since this is a child thread, we need to do in manually.
        self._loop.run_until_complete(self.process_coros())

    def submit_coro(self, coro, callback=None):
        self._waiting_coros.put((coro, callback))

    @asyncio.coroutine
    def process_coros(self):
        while self._keep_running:
            try:
                while True:
                    coro, callback = self._waiting_coros.get_nowait()
                    task = asyncio.async(coro())
                    if callback:
                        task.add_done_callback(callback)
                    self._tasks.append(task)
            except Empty as e:
                pass
            yield from asyncio.sleep(3)     # sleep so the other tasks can run


background_worker = BackgroundWorker()


class Job(object):
    def __init__(self, idx):
        super().__init__()
        self._idx = idx

    def process(self):
        background_worker.submit_coro(self._process, self._process_callback)

    @asyncio.coroutine
    def _process(self):
        with (yield from background_worker.limit_simultaneous_processes):
            print("received processing slot %d" % self._idx)
            start = datetime.now()
            yield from asyncio.sleep(2)
            print("processing %d took %s" % (self._idx, str(datetime.now() - start)))

    def _process_callback(self, future):
        print("callback %d triggered" % self._idx)


def main():
    print("starting worker...")
    background_worker.start()

    for idx in range(10):
        download_job = Job(idx)
        download_job.process()

    command = None
    while command != "quit":
        command = input("enter 'quit' to stop the program: \n")

    print("stopping...")
    background_worker.stop()
    background_worker.join()


if __name__ == '__main__':
    main()

谁能帮我解释一下这种行为?为什么清除 with 块 时信号量不递增?

我发现了这个错误:信号量是用来自主线程的隐式事件循环初始化的,而不是在线程启动时显式设置的 run()

固定版本:

import asyncio

from queue import Queue, Empty
from datetime import datetime
from threading import Thread


class BackgroundWorker(Thread):
    def __init__(self):
        super().__init__()
        self._keep_running = True
        self._waiting_coros = Queue()
        self._tasks = []
        self._loop = None                           # Loop must be initialized in child thread.
        self.limit_simultaneous_processes = None    # Semaphore must be initialized after the loop is set.

    def stop(self):
        self._keep_running = False

    def run(self):
        self._loop = asyncio.new_event_loop()       # Implicit creation of the loop only happens in the main thread.
        asyncio.set_event_loop(self._loop)          # Since this is a child thread, we need to do in manually.
        self.limit_simultaneous_processes = asyncio.Semaphore(2)
        self._loop.run_until_complete(self.process_coros())

    def submit_coro(self, coro, callback=None):
        self._waiting_coros.put((coro, callback))

    @asyncio.coroutine
    def process_coros(self):
        while self._keep_running:
            try:
                while True:
                    coro, callback = self._waiting_coros.get_nowait()
                    task = asyncio.async(coro())
                    if callback:
                        task.add_done_callback(callback)
                    self._tasks.append(task)
            except Empty as e:
                pass
            yield from asyncio.sleep(3)     # sleep so the other tasks can run


background_worker = BackgroundWorker()


class Job(object):
    def __init__(self, idx):
        super().__init__()
        self._idx = idx

    def process(self):
        background_worker.submit_coro(self._process, self._process_callback)

    @asyncio.coroutine
    def _process(self):
        with (yield from background_worker.limit_simultaneous_processes):
            print("received processing slot %d" % self._idx)
            start = datetime.now()
            yield from asyncio.sleep(2)
            print("processing %d took %s" % (self._idx, str(datetime.now() - start)))

    def _process_callback(self, future):
        print("callback %d triggered" % self._idx)


def main():
    print("starting worker...")
    background_worker.start()

    for idx in range(10):
        download_job = Job(idx)
        download_job.process()

    command = None
    while command != "quit":
        command = input("enter 'quit' to stop the program: \n")

    print("stopping...")
    background_worker.stop()
    background_worker.join()


if __name__ == '__main__':
    main()