我如何确保使用 asyncio 一次可以 运行 执行一项特定任务或其他任务?
How can I ensure either one specific task, or the rest, can run at a time with asyncio?
我有一个处理事件的系统。有时可能会发生配置更改,这意味着在重新配置系统时必须停止处理事件。例如(简化和抽象):
class System:
# this will be running multiple times concurrently
async def process_events(self, event):
# this line must not be run while `reconfigure_event_processing` is running
handler = await lookup_event_handler(self.handler_lookup_config, event)
await handler.handle_event(event)
async def reconfigure_event_processing(self, config):
# this code must wait until nothing is using `lookup_event_processing_configuration`
self.handler_lookup_config = config
asyncio
synchronisation primitives 的文档让我很困惑,所以我想知道其中哪一个(如果有的话)解决了这个问题。
并行 process_events
运行ning 的多个实例,处理与 reconfigure_event_processing
的同步取决于您未在问题中显示的代码的外观:是,你如何产生 process_events
任务?
这是我放在一起的一个简短示例,它使用 asyncio.Condition
对象来协调事物。这实际上是一个 运行 可行的例子:
import asyncio
import random
from itertools import count
class System(object):
def __init__(self):
self.condition = asyncio.Condition()
self.flag = False
async def process_events(self, event):
print('start process event', event)
await asyncio.sleep(random.randint(1, 5))
print('end process event', event)
async def reconfigure_event_processing(self, config):
self.flag = True
async with self.condition:
await self.condition.wait()
print('start reconfigure')
await asyncio.sleep(2)
print('end reconfigure')
self.flag = False
self.condition.notify()
async def mainloop(self):
tasks = set()
for i in count():
if self.flag:
print('wait for tasks to complete')
done, pending = await asyncio.wait(tasks)
tasks = pending
print('done waiting for tasks')
async with self.condition:
self.condition.notify()
await self.condition.wait()
tasks.add(asyncio.create_task(self.process_events(i)))
if len(tasks) >= 4:
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED)
tasks = pending
async def reconfigure(self):
while True:
await asyncio.sleep(random.randint(1, 5))
await self.reconfigure_event_processing('foo')
if __name__ == '__main__':
system = System()
# run mainloop() and reconfigure() concurrently forever
asyncio.run(asyncio.gather([system.mainloop(), system.reconfigure()]))
这里的逻辑本质上是:
reconfigure_event_processing
在需要 运行 时升起一个标志,然后等待 self.condition
.
- 当主循环看到标志时,它等待所有运行ning任务完成,然后通知条件,这导致
reconfigure_event_processing
到运行。
- 与此同时,主循环本身等待条件。
- 当
reconfigure_event_processing
完成时,它通知条件并取消设置标志。
如果你 运行 这个,你会看到 (a) 所有 运行ning 任务在你看到 start reconfigure
输出之前完成,并且 (b) 没有更多任务开始直到您看到 end reconfigure
输出。
很久以后回过头来,我意识到我真正需要的是所谓的read-write lock. For this you should just use a tried and tested library, such as aiorwlock
。
来自维基百科:
An RW lock allows concurrent access for read-only operations, while write operations require exclusive access. This means that multiple threads can read the data in parallel but an exclusive lock is needed for writing or modifying data. When a writer is writing the data, all other writers or readers will be blocked until the writer is finished writing
示例用法如下:
from aiorwlock import RWLock
class System:
def __init__(self):
self.lock = RWLock()
async def process_events(self, event):
async with self.lock.reader_lock:
handler = await lookup_event_handler(self.handler_lookup_config, event)
await handler.handle_event(event)
async def reconfigure_event_processing(self, config):
async with self.lock.writer_lock:
self.handler_lookup_config = config
可以看到我原来的解决方案in the revision history of this answer,但我强烈建议只使用现有的库。
我有一个处理事件的系统。有时可能会发生配置更改,这意味着在重新配置系统时必须停止处理事件。例如(简化和抽象):
class System:
# this will be running multiple times concurrently
async def process_events(self, event):
# this line must not be run while `reconfigure_event_processing` is running
handler = await lookup_event_handler(self.handler_lookup_config, event)
await handler.handle_event(event)
async def reconfigure_event_processing(self, config):
# this code must wait until nothing is using `lookup_event_processing_configuration`
self.handler_lookup_config = config
asyncio
synchronisation primitives 的文档让我很困惑,所以我想知道其中哪一个(如果有的话)解决了这个问题。
并行 process_events
运行ning 的多个实例,处理与 reconfigure_event_processing
的同步取决于您未在问题中显示的代码的外观:是,你如何产生 process_events
任务?
这是我放在一起的一个简短示例,它使用 asyncio.Condition
对象来协调事物。这实际上是一个 运行 可行的例子:
import asyncio
import random
from itertools import count
class System(object):
def __init__(self):
self.condition = asyncio.Condition()
self.flag = False
async def process_events(self, event):
print('start process event', event)
await asyncio.sleep(random.randint(1, 5))
print('end process event', event)
async def reconfigure_event_processing(self, config):
self.flag = True
async with self.condition:
await self.condition.wait()
print('start reconfigure')
await asyncio.sleep(2)
print('end reconfigure')
self.flag = False
self.condition.notify()
async def mainloop(self):
tasks = set()
for i in count():
if self.flag:
print('wait for tasks to complete')
done, pending = await asyncio.wait(tasks)
tasks = pending
print('done waiting for tasks')
async with self.condition:
self.condition.notify()
await self.condition.wait()
tasks.add(asyncio.create_task(self.process_events(i)))
if len(tasks) >= 4:
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED)
tasks = pending
async def reconfigure(self):
while True:
await asyncio.sleep(random.randint(1, 5))
await self.reconfigure_event_processing('foo')
if __name__ == '__main__':
system = System()
# run mainloop() and reconfigure() concurrently forever
asyncio.run(asyncio.gather([system.mainloop(), system.reconfigure()]))
这里的逻辑本质上是:
reconfigure_event_processing
在需要 运行 时升起一个标志,然后等待self.condition
.- 当主循环看到标志时,它等待所有运行ning任务完成,然后通知条件,这导致
reconfigure_event_processing
到运行。 - 与此同时,主循环本身等待条件。
- 当
reconfigure_event_processing
完成时,它通知条件并取消设置标志。
如果你 运行 这个,你会看到 (a) 所有 运行ning 任务在你看到 start reconfigure
输出之前完成,并且 (b) 没有更多任务开始直到您看到 end reconfigure
输出。
很久以后回过头来,我意识到我真正需要的是所谓的read-write lock. For this you should just use a tried and tested library, such as aiorwlock
。
来自维基百科:
An RW lock allows concurrent access for read-only operations, while write operations require exclusive access. This means that multiple threads can read the data in parallel but an exclusive lock is needed for writing or modifying data. When a writer is writing the data, all other writers or readers will be blocked until the writer is finished writing
示例用法如下:
from aiorwlock import RWLock
class System:
def __init__(self):
self.lock = RWLock()
async def process_events(self, event):
async with self.lock.reader_lock:
handler = await lookup_event_handler(self.handler_lookup_config, event)
await handler.handle_event(event)
async def reconfigure_event_processing(self, config):
async with self.lock.writer_lock:
self.handler_lookup_config = config
可以看到我原来的解决方案in the revision history of this answer,但我强烈建议只使用现有的库。