python 和 asyncua 中的 Asyncio 队列
Asyncio queues in python and asyncua
我正在使用 asyncio 和 asyncua libraries in python 3.8.7 and I want to use asyncio.Queue 将数据从异步函数传递到主线程。
但是,我遇到的问题是我的队列在第一个 await
之后没有收到任何消息。
这是我的例子:
import threading
import time
import asyncio
from asyncua import Client
import aiohttp
async def main(queue1):
queue1.put_nowait("main")
client = Client(url='opc.tcp://localhost:4840/freeopcua/server/')
#client = aiohttp.ClientSession()
async with client:
await queue1.put("in_async")
queue1.put_nowait("in_async_2")
queue1.put_nowait("after await")
def wrapper(queue1: asyncio.Queue):
queue1.put_nowait("wrapper")
asyncio.run(main(queue1))
if __name__ == '__main__':
queue1 = asyncio.Queue()
t = threading.Thread(target=wrapper, args=(queue1,))
t.start()
time.sleep(1)
noEx = True
while noEx:
try:
x = queue1.get_nowait()
print(x)
except asyncio.QueueEmpty:
noEx = False
我得到:
wrapper
main
如果我使用其他一些异步库(示例中的 aiohttp),那么一切都会按预期工作:
wrapper
main
in_async
in_async_2
after await
我已验证 opc.tcp://localhost:4840/freeopcua/server/
服务器上的 minimal asyncua server 可以正常工作 - 我可以使用此示例中的代码获取数据,但队列似乎不起作用。有什么我想念的吗?
这是我的猜测,但我认为你有竞争条件。这是主线程中的 while 循环:
while noEx:
try:
x = queue1.get_nowait()
print(x)
except asyncio.QueueEmpty:
noEx = False
一旦打印出某些内容,循环就会尝试获取队列中的下一个内容。如果队列在那一刻为空,while 循环将退出,并且不会再打印任何内容。
如果在您的辅助线程中建立连接的速度足够快,队列将填充接下来的两条消息;但是如果那里有一点延迟,则您的主 while 循环可能在消息有机会打印之前已经退出。我很确定在进入 async with: 块之前可能存在(未知的)网络延迟。
我会尝试在 print(x) 语句之后插入一个显着的时间延迟,看看会发生什么(如果这能解决问题,您可以稍后改进代码)。或者更改 while 循环,使其永远运行,因为您始终可以使用 control-C 退出程序。
我正在使用 asyncio 和 asyncua libraries in python 3.8.7 and I want to use asyncio.Queue 将数据从异步函数传递到主线程。
但是,我遇到的问题是我的队列在第一个 await
之后没有收到任何消息。
这是我的例子:
import threading
import time
import asyncio
from asyncua import Client
import aiohttp
async def main(queue1):
queue1.put_nowait("main")
client = Client(url='opc.tcp://localhost:4840/freeopcua/server/')
#client = aiohttp.ClientSession()
async with client:
await queue1.put("in_async")
queue1.put_nowait("in_async_2")
queue1.put_nowait("after await")
def wrapper(queue1: asyncio.Queue):
queue1.put_nowait("wrapper")
asyncio.run(main(queue1))
if __name__ == '__main__':
queue1 = asyncio.Queue()
t = threading.Thread(target=wrapper, args=(queue1,))
t.start()
time.sleep(1)
noEx = True
while noEx:
try:
x = queue1.get_nowait()
print(x)
except asyncio.QueueEmpty:
noEx = False
我得到:
wrapper
main
如果我使用其他一些异步库(示例中的 aiohttp),那么一切都会按预期工作:
wrapper
main
in_async
in_async_2
after await
我已验证 opc.tcp://localhost:4840/freeopcua/server/
服务器上的 minimal asyncua server 可以正常工作 - 我可以使用此示例中的代码获取数据,但队列似乎不起作用。有什么我想念的吗?
这是我的猜测,但我认为你有竞争条件。这是主线程中的 while 循环:
while noEx:
try:
x = queue1.get_nowait()
print(x)
except asyncio.QueueEmpty:
noEx = False
一旦打印出某些内容,循环就会尝试获取队列中的下一个内容。如果队列在那一刻为空,while 循环将退出,并且不会再打印任何内容。
如果在您的辅助线程中建立连接的速度足够快,队列将填充接下来的两条消息;但是如果那里有一点延迟,则您的主 while 循环可能在消息有机会打印之前已经退出。我很确定在进入 async with: 块之前可能存在(未知的)网络延迟。
我会尝试在 print(x) 语句之后插入一个显着的时间延迟,看看会发生什么(如果这能解决问题,您可以稍后改进代码)。或者更改 while 循环,使其永远运行,因为您始终可以使用 control-C 退出程序。