从请求处理程序将项目添加到异步队列
Add item to asyncio queue from a request handler
我有一个 TCP 服务器 运行 并且有一个处理函数需要获取请求的内容,将其添加到异步队列并以 OK 状态回复。
在后台,我有一个异步协同程序 运行,它检测何时添加新项目并执行一些处理。
如何从处理函数将项目放入异步队列中,这不是也不能是异步协程?
我是 运行 DICOM 服务器 pynetdicom,它在端口 104 上侦听传入的 TCP 请求(特别是 DICOM C-STORE
)。
我需要将请求的内容保存到队列中,并 return 一个 0x0000
响应,以便侦听器可用于网络。
这是按照生产者-消费者模式建模的。
我试图定义一个消费者协程 consume_dicom()
,但由于我无法正确定义生产者,目前卡在 await queue.get()
中。
生产者需要简单地调用 queue.put(produce_item)
,但这发生在 handle_store(event)
函数内部,该函数不是 event_loop
的一部分,但每次服务器收到请求时都会调用.
import asyncio
from pynetdicom import (
AE, evt,
StoragePresentationContexts
)
class PacsServer():
def __init__(self, par, listen=True):
# Initialize other stuff...
# Initialize DICOM server
ae = AE(ae_title='DICOM-NODE')
ae.supported_contexts = StoragePresentationContexts
# When a C-STORE request comes, it will be passed to self.handle_store
handlers = [(evt.EVT_C_STORE, self.handle_store)]
# Define queue
loop = asyncio.get_event_loop()
self.queue = asyncio.Queue(loop=loop)
# Define consumer
loop.create_task(self.consume_dicom(self.queue))
# Start server in the background with specified handlers
self.scp = ae.start_server(('', 104), block=False, evt_handlers=handlers)
# Start async loop
self.loop.run_forever()
def handle_store(self, event):
# Request handling
ds = event.dataset
# Here I want to add to the queue but this is not an async method
await queue.put(ds)
return 0x0000
async def consume_dicom(self, queue):
while True:
print(f"AWAITING FROM QUEUE")
ds = await queue.get()
do_some_processing(ds)
我想找到一种方法来将项目添加到队列中,并且 return handle_store()
函数中的 OK 状态。
由于handle_store
在不同的线程中是运行,它需要告诉事件循环将项目入队。这是通过 call_soon_threadsafe
:
完成的
self.loop.call_soon_threadsafe(queue.put_nowait, ds)
注意需要调用queue.put_nowait
而不是queue.put
,因为前者是函数而不是协程。对于无界队列(默认),该函数将始终成功,否则如果队列已满,它将引发异常。
我有一个 TCP 服务器 运行 并且有一个处理函数需要获取请求的内容,将其添加到异步队列并以 OK 状态回复。
在后台,我有一个异步协同程序 运行,它检测何时添加新项目并执行一些处理。
如何从处理函数将项目放入异步队列中,这不是也不能是异步协程?
我是 运行 DICOM 服务器 pynetdicom,它在端口 104 上侦听传入的 TCP 请求(特别是 DICOM C-STORE
)。
我需要将请求的内容保存到队列中,并 return 一个 0x0000
响应,以便侦听器可用于网络。
这是按照生产者-消费者模式建模的。
我试图定义一个消费者协程 consume_dicom()
,但由于我无法正确定义生产者,目前卡在 await queue.get()
中。
生产者需要简单地调用 queue.put(produce_item)
,但这发生在 handle_store(event)
函数内部,该函数不是 event_loop
的一部分,但每次服务器收到请求时都会调用.
import asyncio
from pynetdicom import (
AE, evt,
StoragePresentationContexts
)
class PacsServer():
def __init__(self, par, listen=True):
# Initialize other stuff...
# Initialize DICOM server
ae = AE(ae_title='DICOM-NODE')
ae.supported_contexts = StoragePresentationContexts
# When a C-STORE request comes, it will be passed to self.handle_store
handlers = [(evt.EVT_C_STORE, self.handle_store)]
# Define queue
loop = asyncio.get_event_loop()
self.queue = asyncio.Queue(loop=loop)
# Define consumer
loop.create_task(self.consume_dicom(self.queue))
# Start server in the background with specified handlers
self.scp = ae.start_server(('', 104), block=False, evt_handlers=handlers)
# Start async loop
self.loop.run_forever()
def handle_store(self, event):
# Request handling
ds = event.dataset
# Here I want to add to the queue but this is not an async method
await queue.put(ds)
return 0x0000
async def consume_dicom(self, queue):
while True:
print(f"AWAITING FROM QUEUE")
ds = await queue.get()
do_some_processing(ds)
我想找到一种方法来将项目添加到队列中,并且 return handle_store()
函数中的 OK 状态。
由于handle_store
在不同的线程中是运行,它需要告诉事件循环将项目入队。这是通过 call_soon_threadsafe
:
self.loop.call_soon_threadsafe(queue.put_nowait, ds)
注意需要调用queue.put_nowait
而不是queue.put
,因为前者是函数而不是协程。对于无界队列(默认),该函数将始终成功,否则如果队列已满,它将引发异常。