Asyncio.Queue 消费者未接到电话

Asyncio.Queue consumer not getting called

我有一个 asyncio.Queue 生产者和消费者 运行ning 作为 2 个无限循环。生产者定期将作业添加到队列中,消费者等待直到有作业可用,然后处理它,然后等待下一个作业。

但是,由于某种原因,我的消费者没有接到电话。我认为这是因为生产者任务永远不会让给消费者?

关于如何解决这个问题,让两个工作人员 运行 都像描述的那样在后台工作有什么想法吗?

import asyncio
import concurrent.futures
import time

class Consumer:
    def __init__(self, queue: asyncio.Queue):
        self._duration_before_restart_ms = 3000
        self._queue = queue
        self._last_triggered_time_ms = 0

    async def consumer_loop(self):
        while True:
            print("Consumer new iteration.")
            detected_time_ms = await self._queue.get()
            print("Consumer new event: ", detected_time_ms)
            if (
                detected_time_ms - self._duration_before_restart_ms
                < self._last_triggered_time_ms
            ):
                print("Consumer skipping event: ", detected_time_ms)
                # Invalidate all items in queue that happened before
                # _last_triggered_time_ms.
                continue
            print("Consumer processing event: ", detected_time_ms)
            # Simulate authentication (an io bound operation) with sleep.
            time.sleep(5)
            self._last_triggered_time_ms = int(time.time() * 1000)
            print(
                "Consumer processed event: ",
                detected_time_ms,
                " at: ",
                self._last_triggered_time_ms,
            )


class Producer:
    def __init__(self, queue: asyncio.Queue):
        self._detection_time_period_ms = 3000
        self._last_detection_time_ms = 0
        self._queue = queue

    async def producer_loop(self):
        counter = 0
        while True:
            # Iterates at 2fps
            time.sleep(0.5)
            print("Producer counter: ", counter)
            current_time_ms = int(time.time() * 1000)

            if (counter % 10 > 5) and (
                self._last_detection_time_ms + self._detection_time_period_ms
                < current_time_ms
            ):
                print("Producer adding to queue: ", current_time_ms)
                await self._queue.put(current_time_ms)
                print("Producer added to queue: ", current_time_ms)
                self._last_detection_time_ms = current_time_ms
            counter += 1


async def main():
    q = asyncio.Queue()
    producer = Producer(q)
    consumer = Consumer(q)
    producer_task = asyncio.create_task(producer.producer_loop())
    consumer_task = asyncio.create_task(consumer.consumer_loop())


if __name__ == "__main__":
    asyncio.run(main())

如果我使用 await asyncio.sleep() 而不是 time.sleep()

,代码对我有用

async taks 不会同时 运行 但它应该在看到 await 时切换任务 - 似乎它需要 await asyncio.sleep()是时候从生产者切换到客户,然后再从客户切换回生产者。

您在 put()get() 中有 await,但我无法解释为什么它不切换任务。也许它 switchc 但它切换太快并且它没有足够的时间发送队列中的数据。


import asyncio
import concurrent.futures

class Consumer:

    def __init__(self, queue: asyncio.Queue):
        self._duration_before_restart_ms = 3000
        self._queue = queue
        self._last_triggered_time_ms = 0

    async def consumer_loop(self):
        print('start consumer')

        while True:
            print("Consumer new iteration.")
            detected_time_ms = await self._queue.get()
            print("Consumer new event: ", detected_time_ms)
            if (
                detected_time_ms - self._duration_before_restart_ms
                < self._last_triggered_time_ms
            ):
                print("Consumer skipping event: ", detected_time_ms)
                # Invalidate all items in queue that happened before
                # _last_triggered_time_ms.
                continue
            print("Consumer processing event: ", detected_time_ms)
            # Simulate authentication (an io bound operation) with sleep.
            await asyncio.sleep(5)
            self._last_triggered_time_ms = int(time.time() * 1000)
            print(
                "Consumer processed event: ",
                detected_time_ms,
                " at: ",
                self._last_triggered_time_ms,
            )


class Producer:

    def __init__(self, queue: asyncio.Queue):
        self._detection_time_period_ms = 3000
        self._last_detection_time_ms = 0
        self._queue = queue

    async def producer_loop(self):
        print('start producer')

        counter = 0
        while True:
            # Iterates at 2fps
            await asyncio.sleep(0.5)
            print("Producer counter: ", counter)
            current_time_ms = int(time.time() * 1000)

            if (counter % 10 > 5) and (
                self._last_detection_time_ms + self._detection_time_period_ms
                < current_time_ms
            ):
                print("Producer adding to queue: ", current_time_ms)
                await self._queue.put(current_time_ms)
                print("Producer added to queue: ", current_time_ms)
                self._last_detection_time_ms = current_time_ms
            counter += 1


async def main():
    q = asyncio.Queue()
    producer = Producer(q)
    consumer = Consumer(q)
    producer_task = asyncio.create_task(producer.producer_loop())
    consumer_task = asyncio.create_task(consumer.consumer_loop())

    # wait for end of task
    await asyncio.gather(producer_task)
    
if __name__ == "__main__":
    asyncio.run(main())