1-item asyncio queue - 这是一些标准的东西吗?
1-item asyncio queue - is this some standard thing?
在我的一个 asyncio 项目中,我经常使用一种同步方法,我想知道它是否是某种标准工具,我可以给 google 起个名字以了解更多信息。我使用术语“1-item queue”只是因为我没有更好的名字。这是一个降级队列,与 Queue(maxsize=1)
.
无关
# [controller] ---- commands ---> [worker]
controller 向 worker 发送命令(queue.put
,实际上是 put_nowait
),worker 等待它们(queue.get
)并执行它们,但特殊的规则是只有最后一条命令很重要,它会立即替换所有之前未完成的命令。因此,队列中等待执行的命令永远不会超过 1 个。
为了实现这一点,控制器在放置之前清除队列。没有 queue.clear
,因此它必须丢弃(使用 get_nowait
)等待项(如果有)。 (queue.clear
的缺席让我产生了这个问题的疑虑。)
在 worker 端,如果命令执行需要休眠,它会被 newcmd=queue.get
替换为超时。当超时发生时,是休眠;当 get
成功时,当前工作中止并开始执行 newcmd
。
您使用的队列类型不标准 - 有一次性队列这样的东西,但完全不同。
队列并不真正适合您的用例,尽管您付出了一些努力使其工作。你真的不需要任何类型的排队,你需要一个容纳单个对象(可以被替换)的槽和一个唤醒机制。 asyncio.Event
可用于唤醒,您可以将负载对象(命令)附加到事件的属性。例如:
async def worker(evt):
while True:
await evt.wait()
evt.clear()
if evt.last_command is None:
continue
last_command = evt.last_command
evt.last_command = None
# execute last_command, possibly with timeout
print(last_command)
async def main():
evt = asyncio.Event()
workers = [asyncio.create_task(worker(evt)) for _ in range(5)]
for i in itertools.count():
await asyncio.sleep(1)
evt.last_command = f"foo {i}"
evt.set()
asyncio.run(main())
这与基于队列的方法的一个区别是设置事件将唤醒所有工作人员(如果有多个工作人员),即使第一个工作人员立即调用 evt.clear()
。另一方面,将保证将队列项目移交给 queue.get()
.
的单个等待者
在我的一个 asyncio 项目中,我经常使用一种同步方法,我想知道它是否是某种标准工具,我可以给 google 起个名字以了解更多信息。我使用术语“1-item queue”只是因为我没有更好的名字。这是一个降级队列,与 Queue(maxsize=1)
.
# [controller] ---- commands ---> [worker]
controller 向 worker 发送命令(queue.put
,实际上是 put_nowait
),worker 等待它们(queue.get
)并执行它们,但特殊的规则是只有最后一条命令很重要,它会立即替换所有之前未完成的命令。因此,队列中等待执行的命令永远不会超过 1 个。
为了实现这一点,控制器在放置之前清除队列。没有 queue.clear
,因此它必须丢弃(使用 get_nowait
)等待项(如果有)。 (queue.clear
的缺席让我产生了这个问题的疑虑。)
在 worker 端,如果命令执行需要休眠,它会被 newcmd=queue.get
替换为超时。当超时发生时,是休眠;当 get
成功时,当前工作中止并开始执行 newcmd
。
您使用的队列类型不标准 - 有一次性队列这样的东西,但完全不同。
队列并不真正适合您的用例,尽管您付出了一些努力使其工作。你真的不需要任何类型的排队,你需要一个容纳单个对象(可以被替换)的槽和一个唤醒机制。 asyncio.Event
可用于唤醒,您可以将负载对象(命令)附加到事件的属性。例如:
async def worker(evt):
while True:
await evt.wait()
evt.clear()
if evt.last_command is None:
continue
last_command = evt.last_command
evt.last_command = None
# execute last_command, possibly with timeout
print(last_command)
async def main():
evt = asyncio.Event()
workers = [asyncio.create_task(worker(evt)) for _ in range(5)]
for i in itertools.count():
await asyncio.sleep(1)
evt.last_command = f"foo {i}"
evt.set()
asyncio.run(main())
这与基于队列的方法的一个区别是设置事件将唤醒所有工作人员(如果有多个工作人员),即使第一个工作人员立即调用 evt.clear()
。另一方面,将保证将队列项目移交给 queue.get()
.