在后台线程中同时限制 运行 个带有信号量的异步协程
Limiting simultaneously running asyncio coroutines with semaphores in a background thread
作为 Python 的新 asyncio 模块的实验,我创建了以下代码片段来在后台工作程序中处理一组长 运行 操作(作业)。
为了控制同时 运行 作业的数量,我在 with 块 中引入了一个信号量(第 56 行)。但是,有了信号量,似乎永远不会释放获取的锁,因为完成后(执行回调)等待的作业不会启动。当我放弃 with block 时,一切都按预期工作。
import asyncio
from queue import Queue, Empty
from datetime import datetime
from threading import Thread
class BackgroundWorker(Thread):
def __init__(self):
super().__init__()
self._keep_running = True
self._waiting_coros = Queue()
self._tasks = []
self._loop = None # Loop must be initialized in child thread.
self.limit_simultaneous_processes = asyncio.Semaphore(2)
def stop(self):
self._keep_running = False
def run(self):
self._loop = asyncio.new_event_loop() # Implicit creation of the loop only happens in the main thread.
asyncio.set_event_loop(self._loop) # Since this is a child thread, we need to do in manually.
self._loop.run_until_complete(self.process_coros())
def submit_coro(self, coro, callback=None):
self._waiting_coros.put((coro, callback))
@asyncio.coroutine
def process_coros(self):
while self._keep_running:
try:
while True:
coro, callback = self._waiting_coros.get_nowait()
task = asyncio.async(coro())
if callback:
task.add_done_callback(callback)
self._tasks.append(task)
except Empty as e:
pass
yield from asyncio.sleep(3) # sleep so the other tasks can run
background_worker = BackgroundWorker()
class Job(object):
def __init__(self, idx):
super().__init__()
self._idx = idx
def process(self):
background_worker.submit_coro(self._process, self._process_callback)
@asyncio.coroutine
def _process(self):
with (yield from background_worker.limit_simultaneous_processes):
print("received processing slot %d" % self._idx)
start = datetime.now()
yield from asyncio.sleep(2)
print("processing %d took %s" % (self._idx, str(datetime.now() - start)))
def _process_callback(self, future):
print("callback %d triggered" % self._idx)
def main():
print("starting worker...")
background_worker.start()
for idx in range(10):
download_job = Job(idx)
download_job.process()
command = None
while command != "quit":
command = input("enter 'quit' to stop the program: \n")
print("stopping...")
background_worker.stop()
background_worker.join()
if __name__ == '__main__':
main()
谁能帮我解释一下这种行为?为什么清除 with 块 时信号量不递增?
我发现了这个错误:信号量是用来自主线程的隐式事件循环初始化的,而不是在线程启动时显式设置的 run()
。
固定版本:
import asyncio
from queue import Queue, Empty
from datetime import datetime
from threading import Thread
class BackgroundWorker(Thread):
def __init__(self):
super().__init__()
self._keep_running = True
self._waiting_coros = Queue()
self._tasks = []
self._loop = None # Loop must be initialized in child thread.
self.limit_simultaneous_processes = None # Semaphore must be initialized after the loop is set.
def stop(self):
self._keep_running = False
def run(self):
self._loop = asyncio.new_event_loop() # Implicit creation of the loop only happens in the main thread.
asyncio.set_event_loop(self._loop) # Since this is a child thread, we need to do in manually.
self.limit_simultaneous_processes = asyncio.Semaphore(2)
self._loop.run_until_complete(self.process_coros())
def submit_coro(self, coro, callback=None):
self._waiting_coros.put((coro, callback))
@asyncio.coroutine
def process_coros(self):
while self._keep_running:
try:
while True:
coro, callback = self._waiting_coros.get_nowait()
task = asyncio.async(coro())
if callback:
task.add_done_callback(callback)
self._tasks.append(task)
except Empty as e:
pass
yield from asyncio.sleep(3) # sleep so the other tasks can run
background_worker = BackgroundWorker()
class Job(object):
def __init__(self, idx):
super().__init__()
self._idx = idx
def process(self):
background_worker.submit_coro(self._process, self._process_callback)
@asyncio.coroutine
def _process(self):
with (yield from background_worker.limit_simultaneous_processes):
print("received processing slot %d" % self._idx)
start = datetime.now()
yield from asyncio.sleep(2)
print("processing %d took %s" % (self._idx, str(datetime.now() - start)))
def _process_callback(self, future):
print("callback %d triggered" % self._idx)
def main():
print("starting worker...")
background_worker.start()
for idx in range(10):
download_job = Job(idx)
download_job.process()
command = None
while command != "quit":
command = input("enter 'quit' to stop the program: \n")
print("stopping...")
background_worker.stop()
background_worker.join()
if __name__ == '__main__':
main()
作为 Python 的新 asyncio 模块的实验,我创建了以下代码片段来在后台工作程序中处理一组长 运行 操作(作业)。
为了控制同时 运行 作业的数量,我在 with 块 中引入了一个信号量(第 56 行)。但是,有了信号量,似乎永远不会释放获取的锁,因为完成后(执行回调)等待的作业不会启动。当我放弃 with block 时,一切都按预期工作。
import asyncio
from queue import Queue, Empty
from datetime import datetime
from threading import Thread
class BackgroundWorker(Thread):
def __init__(self):
super().__init__()
self._keep_running = True
self._waiting_coros = Queue()
self._tasks = []
self._loop = None # Loop must be initialized in child thread.
self.limit_simultaneous_processes = asyncio.Semaphore(2)
def stop(self):
self._keep_running = False
def run(self):
self._loop = asyncio.new_event_loop() # Implicit creation of the loop only happens in the main thread.
asyncio.set_event_loop(self._loop) # Since this is a child thread, we need to do in manually.
self._loop.run_until_complete(self.process_coros())
def submit_coro(self, coro, callback=None):
self._waiting_coros.put((coro, callback))
@asyncio.coroutine
def process_coros(self):
while self._keep_running:
try:
while True:
coro, callback = self._waiting_coros.get_nowait()
task = asyncio.async(coro())
if callback:
task.add_done_callback(callback)
self._tasks.append(task)
except Empty as e:
pass
yield from asyncio.sleep(3) # sleep so the other tasks can run
background_worker = BackgroundWorker()
class Job(object):
def __init__(self, idx):
super().__init__()
self._idx = idx
def process(self):
background_worker.submit_coro(self._process, self._process_callback)
@asyncio.coroutine
def _process(self):
with (yield from background_worker.limit_simultaneous_processes):
print("received processing slot %d" % self._idx)
start = datetime.now()
yield from asyncio.sleep(2)
print("processing %d took %s" % (self._idx, str(datetime.now() - start)))
def _process_callback(self, future):
print("callback %d triggered" % self._idx)
def main():
print("starting worker...")
background_worker.start()
for idx in range(10):
download_job = Job(idx)
download_job.process()
command = None
while command != "quit":
command = input("enter 'quit' to stop the program: \n")
print("stopping...")
background_worker.stop()
background_worker.join()
if __name__ == '__main__':
main()
谁能帮我解释一下这种行为?为什么清除 with 块 时信号量不递增?
我发现了这个错误:信号量是用来自主线程的隐式事件循环初始化的,而不是在线程启动时显式设置的 run()
。
固定版本:
import asyncio
from queue import Queue, Empty
from datetime import datetime
from threading import Thread
class BackgroundWorker(Thread):
def __init__(self):
super().__init__()
self._keep_running = True
self._waiting_coros = Queue()
self._tasks = []
self._loop = None # Loop must be initialized in child thread.
self.limit_simultaneous_processes = None # Semaphore must be initialized after the loop is set.
def stop(self):
self._keep_running = False
def run(self):
self._loop = asyncio.new_event_loop() # Implicit creation of the loop only happens in the main thread.
asyncio.set_event_loop(self._loop) # Since this is a child thread, we need to do in manually.
self.limit_simultaneous_processes = asyncio.Semaphore(2)
self._loop.run_until_complete(self.process_coros())
def submit_coro(self, coro, callback=None):
self._waiting_coros.put((coro, callback))
@asyncio.coroutine
def process_coros(self):
while self._keep_running:
try:
while True:
coro, callback = self._waiting_coros.get_nowait()
task = asyncio.async(coro())
if callback:
task.add_done_callback(callback)
self._tasks.append(task)
except Empty as e:
pass
yield from asyncio.sleep(3) # sleep so the other tasks can run
background_worker = BackgroundWorker()
class Job(object):
def __init__(self, idx):
super().__init__()
self._idx = idx
def process(self):
background_worker.submit_coro(self._process, self._process_callback)
@asyncio.coroutine
def _process(self):
with (yield from background_worker.limit_simultaneous_processes):
print("received processing slot %d" % self._idx)
start = datetime.now()
yield from asyncio.sleep(2)
print("processing %d took %s" % (self._idx, str(datetime.now() - start)))
def _process_callback(self, future):
print("callback %d triggered" % self._idx)
def main():
print("starting worker...")
background_worker.start()
for idx in range(10):
download_job = Job(idx)
download_job.process()
command = None
while command != "quit":
command = input("enter 'quit' to stop the program: \n")
print("stopping...")
background_worker.stop()
background_worker.join()
if __name__ == '__main__':
main()