Asyncio.Queue 消费者未接到电话
Asyncio.Queue consumer not getting called
我有一个 asyncio.Queue 生产者和消费者 运行ning 作为 2 个无限循环。生产者定期将作业添加到队列中,消费者等待直到有作业可用,然后处理它,然后等待下一个作业。
但是,由于某种原因,我的消费者没有接到电话。我认为这是因为生产者任务永远不会让给消费者?
关于如何解决这个问题,让两个工作人员 运行 都像描述的那样在后台工作有什么想法吗?
import asyncio
import concurrent.futures
import time
class Consumer:
def __init__(self, queue: asyncio.Queue):
self._duration_before_restart_ms = 3000
self._queue = queue
self._last_triggered_time_ms = 0
async def consumer_loop(self):
while True:
print("Consumer new iteration.")
detected_time_ms = await self._queue.get()
print("Consumer new event: ", detected_time_ms)
if (
detected_time_ms - self._duration_before_restart_ms
< self._last_triggered_time_ms
):
print("Consumer skipping event: ", detected_time_ms)
# Invalidate all items in queue that happened before
# _last_triggered_time_ms.
continue
print("Consumer processing event: ", detected_time_ms)
# Simulate authentication (an io bound operation) with sleep.
time.sleep(5)
self._last_triggered_time_ms = int(time.time() * 1000)
print(
"Consumer processed event: ",
detected_time_ms,
" at: ",
self._last_triggered_time_ms,
)
class Producer:
def __init__(self, queue: asyncio.Queue):
self._detection_time_period_ms = 3000
self._last_detection_time_ms = 0
self._queue = queue
async def producer_loop(self):
counter = 0
while True:
# Iterates at 2fps
time.sleep(0.5)
print("Producer counter: ", counter)
current_time_ms = int(time.time() * 1000)
if (counter % 10 > 5) and (
self._last_detection_time_ms + self._detection_time_period_ms
< current_time_ms
):
print("Producer adding to queue: ", current_time_ms)
await self._queue.put(current_time_ms)
print("Producer added to queue: ", current_time_ms)
self._last_detection_time_ms = current_time_ms
counter += 1
async def main():
q = asyncio.Queue()
producer = Producer(q)
consumer = Consumer(q)
producer_task = asyncio.create_task(producer.producer_loop())
consumer_task = asyncio.create_task(consumer.consumer_loop())
if __name__ == "__main__":
asyncio.run(main())
如果我使用 await asyncio.sleep()
而不是 time.sleep()
,代码对我有用
在 async
taks 不会同时 运行 但它应该在看到 await
时切换任务 - 似乎它需要 await asyncio.sleep()
是时候从生产者切换到客户,然后再从客户切换回生产者。
您在 put()
和 get()
中有 await
,但我无法解释为什么它不切换任务。也许它 switchc 但它切换太快并且它没有足够的时间发送队列中的数据。
import asyncio
import concurrent.futures
class Consumer:
def __init__(self, queue: asyncio.Queue):
self._duration_before_restart_ms = 3000
self._queue = queue
self._last_triggered_time_ms = 0
async def consumer_loop(self):
print('start consumer')
while True:
print("Consumer new iteration.")
detected_time_ms = await self._queue.get()
print("Consumer new event: ", detected_time_ms)
if (
detected_time_ms - self._duration_before_restart_ms
< self._last_triggered_time_ms
):
print("Consumer skipping event: ", detected_time_ms)
# Invalidate all items in queue that happened before
# _last_triggered_time_ms.
continue
print("Consumer processing event: ", detected_time_ms)
# Simulate authentication (an io bound operation) with sleep.
await asyncio.sleep(5)
self._last_triggered_time_ms = int(time.time() * 1000)
print(
"Consumer processed event: ",
detected_time_ms,
" at: ",
self._last_triggered_time_ms,
)
class Producer:
def __init__(self, queue: asyncio.Queue):
self._detection_time_period_ms = 3000
self._last_detection_time_ms = 0
self._queue = queue
async def producer_loop(self):
print('start producer')
counter = 0
while True:
# Iterates at 2fps
await asyncio.sleep(0.5)
print("Producer counter: ", counter)
current_time_ms = int(time.time() * 1000)
if (counter % 10 > 5) and (
self._last_detection_time_ms + self._detection_time_period_ms
< current_time_ms
):
print("Producer adding to queue: ", current_time_ms)
await self._queue.put(current_time_ms)
print("Producer added to queue: ", current_time_ms)
self._last_detection_time_ms = current_time_ms
counter += 1
async def main():
q = asyncio.Queue()
producer = Producer(q)
consumer = Consumer(q)
producer_task = asyncio.create_task(producer.producer_loop())
consumer_task = asyncio.create_task(consumer.consumer_loop())
# wait for end of task
await asyncio.gather(producer_task)
if __name__ == "__main__":
asyncio.run(main())
我有一个 asyncio.Queue 生产者和消费者 运行ning 作为 2 个无限循环。生产者定期将作业添加到队列中,消费者等待直到有作业可用,然后处理它,然后等待下一个作业。
但是,由于某种原因,我的消费者没有接到电话。我认为这是因为生产者任务永远不会让给消费者?
关于如何解决这个问题,让两个工作人员 运行 都像描述的那样在后台工作有什么想法吗?
import asyncio
import concurrent.futures
import time
class Consumer:
def __init__(self, queue: asyncio.Queue):
self._duration_before_restart_ms = 3000
self._queue = queue
self._last_triggered_time_ms = 0
async def consumer_loop(self):
while True:
print("Consumer new iteration.")
detected_time_ms = await self._queue.get()
print("Consumer new event: ", detected_time_ms)
if (
detected_time_ms - self._duration_before_restart_ms
< self._last_triggered_time_ms
):
print("Consumer skipping event: ", detected_time_ms)
# Invalidate all items in queue that happened before
# _last_triggered_time_ms.
continue
print("Consumer processing event: ", detected_time_ms)
# Simulate authentication (an io bound operation) with sleep.
time.sleep(5)
self._last_triggered_time_ms = int(time.time() * 1000)
print(
"Consumer processed event: ",
detected_time_ms,
" at: ",
self._last_triggered_time_ms,
)
class Producer:
def __init__(self, queue: asyncio.Queue):
self._detection_time_period_ms = 3000
self._last_detection_time_ms = 0
self._queue = queue
async def producer_loop(self):
counter = 0
while True:
# Iterates at 2fps
time.sleep(0.5)
print("Producer counter: ", counter)
current_time_ms = int(time.time() * 1000)
if (counter % 10 > 5) and (
self._last_detection_time_ms + self._detection_time_period_ms
< current_time_ms
):
print("Producer adding to queue: ", current_time_ms)
await self._queue.put(current_time_ms)
print("Producer added to queue: ", current_time_ms)
self._last_detection_time_ms = current_time_ms
counter += 1
async def main():
q = asyncio.Queue()
producer = Producer(q)
consumer = Consumer(q)
producer_task = asyncio.create_task(producer.producer_loop())
consumer_task = asyncio.create_task(consumer.consumer_loop())
if __name__ == "__main__":
asyncio.run(main())
如果我使用 await asyncio.sleep()
而不是 time.sleep()
在 async
taks 不会同时 运行 但它应该在看到 await
时切换任务 - 似乎它需要 await asyncio.sleep()
是时候从生产者切换到客户,然后再从客户切换回生产者。
您在 put()
和 get()
中有 await
,但我无法解释为什么它不切换任务。也许它 switchc 但它切换太快并且它没有足够的时间发送队列中的数据。
import asyncio
import concurrent.futures
class Consumer:
def __init__(self, queue: asyncio.Queue):
self._duration_before_restart_ms = 3000
self._queue = queue
self._last_triggered_time_ms = 0
async def consumer_loop(self):
print('start consumer')
while True:
print("Consumer new iteration.")
detected_time_ms = await self._queue.get()
print("Consumer new event: ", detected_time_ms)
if (
detected_time_ms - self._duration_before_restart_ms
< self._last_triggered_time_ms
):
print("Consumer skipping event: ", detected_time_ms)
# Invalidate all items in queue that happened before
# _last_triggered_time_ms.
continue
print("Consumer processing event: ", detected_time_ms)
# Simulate authentication (an io bound operation) with sleep.
await asyncio.sleep(5)
self._last_triggered_time_ms = int(time.time() * 1000)
print(
"Consumer processed event: ",
detected_time_ms,
" at: ",
self._last_triggered_time_ms,
)
class Producer:
def __init__(self, queue: asyncio.Queue):
self._detection_time_period_ms = 3000
self._last_detection_time_ms = 0
self._queue = queue
async def producer_loop(self):
print('start producer')
counter = 0
while True:
# Iterates at 2fps
await asyncio.sleep(0.5)
print("Producer counter: ", counter)
current_time_ms = int(time.time() * 1000)
if (counter % 10 > 5) and (
self._last_detection_time_ms + self._detection_time_period_ms
< current_time_ms
):
print("Producer adding to queue: ", current_time_ms)
await self._queue.put(current_time_ms)
print("Producer added to queue: ", current_time_ms)
self._last_detection_time_ms = current_time_ms
counter += 1
async def main():
q = asyncio.Queue()
producer = Producer(q)
consumer = Consumer(q)
producer_task = asyncio.create_task(producer.producer_loop())
consumer_task = asyncio.create_task(consumer.consumer_loop())
# wait for end of task
await asyncio.gather(producer_task)
if __name__ == "__main__":
asyncio.run(main())