从请求处理程序将项目添加到异步队列

Add item to asyncio queue from a request handler

我有一个 TCP 服务器 运行 并且有一个处理函数需要获取请求的内容,将其添加到异步队列并以 OK 状态回复。

在后台,我有一个异步协同程序 运行,它检测何时添加新项目并执行一些处理。

如何从处理函数将项目放入异步队列中,这不是也不能是异步协程?

我是 运行 DICOM 服务器 pynetdicom,它在端口 104 上侦听传入的 TCP 请求(特别是 DICOM C-STORE)。

我需要将请求的内容保存到队列中,并 return 一个 0x0000 响应,以便侦听器可用于网络。

这是按照生产者-消费者模式建模的。

我试图定义一个消费者协程 consume_dicom(),但由于我无法正确定义生产者,目前卡在 await queue.get() 中。

生产者需要简单地调用 queue.put(produce_item),但这发生在 handle_store(event) 函数内部,该函数不是 event_loop 的一部分,但每次服务器收到请求时都会调用.

import asyncio

from pynetdicom import (
    AE, evt,
    StoragePresentationContexts
)

class PacsServer():
    def __init__(self, par, listen=True):
        # Initialize other stuff...

        # Initialize DICOM server
        ae = AE(ae_title='DICOM-NODE')
        ae.supported_contexts = StoragePresentationContexts

        # When a C-STORE request comes, it will be passed to self.handle_store
        handlers = [(evt.EVT_C_STORE, self.handle_store)]

        # Define queue
        loop = asyncio.get_event_loop()
        self.queue = asyncio.Queue(loop=loop)

        # Define consumer
        loop.create_task(self.consume_dicom(self.queue))

        # Start server in the background with specified handlers
        self.scp = ae.start_server(('', 104), block=False, evt_handlers=handlers)

        # Start async loop
        self.loop.run_forever()



    def handle_store(self, event):
        # Request handling
        ds = event.dataset

        # Here I want to add to the queue but this is not an async method
        await queue.put(ds)

        return 0x0000


    async def consume_dicom(self, queue):
        while True:
            print(f"AWAITING FROM QUEUE")
            ds = await queue.get()

            do_some_processing(ds)

我想找到一种方法来将项目添加到队列中,并且 return handle_store() 函数中的 OK 状态。

由于handle_store在不同的线程中是运行,它需要告诉事件循环将项目入队。这是通过 call_soon_threadsafe:

完成的
self.loop.call_soon_threadsafe(queue.put_nowait, ds)

注意需要调用queue.put_nowait而不是queue.put,因为前者是函数而不是协程。对于无界队列(默认),该函数将始终成功,否则如果队列已满,它将引发异常。