同步异步队列
Synchronize asyncio Queue
我计划采用基于异步队列的生产者-消费者实现来处理实时数据,其中以正确的时间顺序发送数据至关重要。所以这是它的代码片段:
async def produce(Q, n_jobs):
for i in range(n_jobs):
print(f"Producing :{i}")
await Q.put(i)
async def consume(Q):
while True:
n = await Q.get()
print(f"Consumed :{n}")
x = do_sometask_and_return_the_result(n)
print(f"Finished :{n} and Result: {x}")
async def main(loop):
Q = asyncio.Queue(loop=loop, maxsize=3)
await asyncio.wait([produce(Q, 10), consume(Q), consume(Q), consume(Q)])
print("Done")
这里producer生产数据,放入asyncio Queue中。我有多个消费者来消费和处理数据。在查看输出时,在打印“Consumed :{n}”(如 1,2,3,4... 等)时保持顺序,这完全没问题。但是,由于函数 do_sometask_and_return_the_result(n) 需要可变时间到 return 结果,因此在下一次打印 n“Finished :{n}”(如 2,1, 4,3,5,...).
有什么方法可以同步这些数据,因为我需要保持打印结果的顺序?我想在 do_sometask_and_return_the_result(n).
之后看到 'n' 的 1,2,3,4,.. 顺序打印
您可以使用优先队列系统(使用 python heapq
库)在作业完成后重新排序。像这样。
# add these variables at class/global scope
priority_queue = []
current_job_id = 1
job_id_dict = {}
async def produce(Q, n_jobs):
# same as above
async def consume(Q):
while True:
n = await Q.get()
print(f"Consumed :{n}")
x = do_sometask_and_return_the_result(n)
await process_result(n, x)
async def process_result(n, x):
heappush(priority_queue, n)
job_id_dict[n] = x
while current_job_id == priority_queue[0]:
job_id = heappop(priority_queue)
print(f"Finished :{job_id} and Result: {job_id_dict[job_id]}")
current_job_id += 1
async def main(loop):
Q = asyncio.Queue(loop=loop, maxsize=3)
await asyncio.wait([produce(Q, 10), consume(Q), consume(Q), consume(Q)])
print("Done")
有关 heapq
模块的更多信息:https://docs.python.org/3/library/heapq.html
我计划采用基于异步队列的生产者-消费者实现来处理实时数据,其中以正确的时间顺序发送数据至关重要。所以这是它的代码片段:
async def produce(Q, n_jobs):
for i in range(n_jobs):
print(f"Producing :{i}")
await Q.put(i)
async def consume(Q):
while True:
n = await Q.get()
print(f"Consumed :{n}")
x = do_sometask_and_return_the_result(n)
print(f"Finished :{n} and Result: {x}")
async def main(loop):
Q = asyncio.Queue(loop=loop, maxsize=3)
await asyncio.wait([produce(Q, 10), consume(Q), consume(Q), consume(Q)])
print("Done")
这里producer生产数据,放入asyncio Queue中。我有多个消费者来消费和处理数据。在查看输出时,在打印“Consumed :{n}”(如 1,2,3,4... 等)时保持顺序,这完全没问题。但是,由于函数 do_sometask_and_return_the_result(n) 需要可变时间到 return 结果,因此在下一次打印 n“Finished :{n}”(如 2,1, 4,3,5,...).
有什么方法可以同步这些数据,因为我需要保持打印结果的顺序?我想在 do_sometask_and_return_the_result(n).
之后看到 'n' 的 1,2,3,4,.. 顺序打印您可以使用优先队列系统(使用 python heapq
库)在作业完成后重新排序。像这样。
# add these variables at class/global scope
priority_queue = []
current_job_id = 1
job_id_dict = {}
async def produce(Q, n_jobs):
# same as above
async def consume(Q):
while True:
n = await Q.get()
print(f"Consumed :{n}")
x = do_sometask_and_return_the_result(n)
await process_result(n, x)
async def process_result(n, x):
heappush(priority_queue, n)
job_id_dict[n] = x
while current_job_id == priority_queue[0]:
job_id = heappop(priority_queue)
print(f"Finished :{job_id} and Result: {job_id_dict[job_id]}")
current_job_id += 1
async def main(loop):
Q = asyncio.Queue(loop=loop, maxsize=3)
await asyncio.wait([produce(Q, 10), consume(Q), consume(Q), consume(Q)])
print("Done")
有关 heapq
模块的更多信息:https://docs.python.org/3/library/heapq.html