为什么我的消费者在队列中与我的生产者分开工作?
Why is my consumer working separately from my producer in the queue?
我的 objective 是异步调用 API,并将结果(每次调用)写入文件(1 个调用 -> 1 个文件)。我认为实现它的一种方法是使用队列。我的意图是在生产者准备好响应后立即将响应推入队列,然后让消费者在文件可用时立即处理(写入)文件。
困惑:当我 运行 我的代码时查看打印语句,我看到首先生产者完成,然后消费者开始使用我的输出。这似乎与我希望消费者在任务可用后立即处理任务的意图不符。我也考虑过使用多个进程(1 个用于消费者,1 个用于生产者),但我不确定这样是否会使事情复杂化。
我创建了当前状态的插图:
import aiohttp
import asyncio
async def get_data(session, day):
async with session.post(url=SOME_URL, json=SOME_FORMAT, headers=HEADERS) as response:
return await response.text()
async def producer(q, day):
async with aiohttp.ClientSession() as session:
result = await get_data(session, day)
await q.put(result)
async def consumer(q):
while True:
outcome = await q.get()
print("Consumed:", outcome) # assuming I write files here
q.task_done()
async def main():
queue = asyncio.Queue()
days = [day for day in range(20)] # Here I normally use calendar dates instead of range
producers = [asyncio.create_task(producer(queue, day) for day in days]
consumer = asyncio.create_task(consumer(queue)
await asyncio.gather(*producers)
await queue.join()
consumer.cancel()
if __name__ == '__main__':
asyncio.run(main())
我走的路对吗?
您的代码大体上没有问题(除了几个语法错误,我猜这是错误的复制粘贴造成的)。所有的生产者确实是在消费者开始工作之前创建的,因为他们没有什么可等待的。但是,如果有生产者需要做的实际工作,您会看到他们 完成 只有在消费者开始工作后才完成工作,然后工作文件。
这是您的代码的编辑版本,以及证明一切正常的输出。
import aiohttp
import asyncio
async def get_data(session, day):
print(f"get data, day {day}")
async with session.get(url="https://www.google.com") as response:
res = await response.text()
print(f"got data, day {day}")
return res[:100]
async def producer(q, day):
async with aiohttp.ClientSession() as session:
result = await get_data(session, day)
await q.put(result)
async def consumer(q):
print("Consumer stated")
while True:
outcome = await q.get()
print("Consumed:", outcome) # assuming I write files here
asyncio.sleep(1)
q.task_done()
async def main():
queue = asyncio.Queue()
days = [day for day in range(20)] # Here I normally use calendar dates instead of range
producers = [asyncio.create_task(producer(queue, day)) for day in days]
print("main: producer tasks created")
consumer_task = asyncio.create_task(consumer(queue))
print("main: consumer task created")
await asyncio.gather(*producers)
print("main: gathered producers")
await queue.join()
consumer_task.cancel()
if __name__ == '__main__':
asyncio.run(main())
输出:
main: producer tasks created
main: consumer task created
get data, day 0
get data, day 1
get data, day 2
get data, day 3
...
get data, day 19
Consumer stated
got data, day 1
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
queue_so.py:21: RuntimeWarning: coroutine 'sleep' was never awaited
asyncio.sleep(1)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
got data, day 10
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 19
got data, day 11
got data, day 14
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 15
got data, day 17
got data, day 6
got data, day 18
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 7
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 8
got data, day 9
got data, day 2
got data, day 12
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 0
got data, day 5
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 4
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 3
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 13
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 16
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
main: gathered producers
我的 objective 是异步调用 API,并将结果(每次调用)写入文件(1 个调用 -> 1 个文件)。我认为实现它的一种方法是使用队列。我的意图是在生产者准备好响应后立即将响应推入队列,然后让消费者在文件可用时立即处理(写入)文件。
困惑:当我 运行 我的代码时查看打印语句,我看到首先生产者完成,然后消费者开始使用我的输出。这似乎与我希望消费者在任务可用后立即处理任务的意图不符。我也考虑过使用多个进程(1 个用于消费者,1 个用于生产者),但我不确定这样是否会使事情复杂化。
我创建了当前状态的插图:
import aiohttp
import asyncio
async def get_data(session, day):
async with session.post(url=SOME_URL, json=SOME_FORMAT, headers=HEADERS) as response:
return await response.text()
async def producer(q, day):
async with aiohttp.ClientSession() as session:
result = await get_data(session, day)
await q.put(result)
async def consumer(q):
while True:
outcome = await q.get()
print("Consumed:", outcome) # assuming I write files here
q.task_done()
async def main():
queue = asyncio.Queue()
days = [day for day in range(20)] # Here I normally use calendar dates instead of range
producers = [asyncio.create_task(producer(queue, day) for day in days]
consumer = asyncio.create_task(consumer(queue)
await asyncio.gather(*producers)
await queue.join()
consumer.cancel()
if __name__ == '__main__':
asyncio.run(main())
我走的路对吗?
您的代码大体上没有问题(除了几个语法错误,我猜这是错误的复制粘贴造成的)。所有的生产者确实是在消费者开始工作之前创建的,因为他们没有什么可等待的。但是,如果有生产者需要做的实际工作,您会看到他们 完成 只有在消费者开始工作后才完成工作,然后工作文件。
这是您的代码的编辑版本,以及证明一切正常的输出。
import aiohttp
import asyncio
async def get_data(session, day):
print(f"get data, day {day}")
async with session.get(url="https://www.google.com") as response:
res = await response.text()
print(f"got data, day {day}")
return res[:100]
async def producer(q, day):
async with aiohttp.ClientSession() as session:
result = await get_data(session, day)
await q.put(result)
async def consumer(q):
print("Consumer stated")
while True:
outcome = await q.get()
print("Consumed:", outcome) # assuming I write files here
asyncio.sleep(1)
q.task_done()
async def main():
queue = asyncio.Queue()
days = [day for day in range(20)] # Here I normally use calendar dates instead of range
producers = [asyncio.create_task(producer(queue, day)) for day in days]
print("main: producer tasks created")
consumer_task = asyncio.create_task(consumer(queue))
print("main: consumer task created")
await asyncio.gather(*producers)
print("main: gathered producers")
await queue.join()
consumer_task.cancel()
if __name__ == '__main__':
asyncio.run(main())
输出:
main: producer tasks created
main: consumer task created
get data, day 0
get data, day 1
get data, day 2
get data, day 3
...
get data, day 19
Consumer stated
got data, day 1
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
queue_so.py:21: RuntimeWarning: coroutine 'sleep' was never awaited
asyncio.sleep(1)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
got data, day 10
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 19
got data, day 11
got data, day 14
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 15
got data, day 17
got data, day 6
got data, day 18
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 7
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 8
got data, day 9
got data, day 2
got data, day 12
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 0
got data, day 5
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 4
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 3
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 13
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 16
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
main: gathered producers