在 Python 中使用 async/await 的 Azure ServiceBus 似乎不起作用
Azure ServiceBus using async/await in Python seems not to work
我正在尝试使用 async/await
从 Azure ServiceBus 主题读取消息,然后通过 HTTP 将内容转发到另一个应用程序。我的代码很简单:
import asyncio
from aiohttp import ClientSession
from azure.servicebus.aio.async_client import ServiceBusService
bus_service = ServiceBusService(service_namespace=..., shared_access_key_name=..., shared_access_key_value=...)
async def watch(topic_name, subscription_name):
print('{} started'.format(topic_name))
message = bus_service.receive_subscription_message(topic_name, subscription_name, peek_lock=False, timeout=1)
if message.body is not None:
async with ClientSession() as session:
await session.post('ip:port/endpoint',
headers={'Content-type': 'application/x-www-form-urlencoded'},
data={'data': message.body.decode()})
async def do():
while True:
for topic in ['topic1', 'topic2', 'topic3']:
await watch(topic, 'watcher')
if __name__ == "__main__":
asyncio.run(do())
我想(永远)寻找来自各种主题的消息,当消息到达时发送 POST。我从 azure
导入 aio
包,它应该以异步方式工作。经过多次尝试,我得到的唯一解决方案是 while True
并设置 timeout=1
。这不是我想要的,我是按顺序做的。
azure-servicebus
版本 0.50.3
.
这是我第一次使用 async/await
可能我遗漏了什么...
任何 solution/suggestions?
您将必须使用软件包:azure.servicebus.aio
他们有以下异步模块:
我们将不得不使用接收处理程序 class - 它可以用 get_receiver() 方法实例化。使用此对象,您将能够异步地遍历消息。启动一个示例脚本,您可以进一步优化它:
from azure.servicebus.aio import SubscriptionClient
import asyncio
import nest_asyncio
nest_asyncio.apply()
Receiving = True
#Topic 1 receiver :
conn_str= "<>"
name="Allmessages1"
SubsClient = SubscriptionClient.from_connection_string(conn_str, name)
receiver = SubsClient.get_receiver()
#Topic 2 receiver :
conn_str2= "<>"
name2="Allmessages2"
SubsClient2 = SubscriptionClient.from_connection_string(conn_str2, name2)
receiver2 = SubsClient2.get_receiver()
#obj= SubscriptionClient("svijayservicebus","mytopic1", shared_access_key_name="RootManageSharedAccessKey", shared_access_key_value="ySr+maBCmIRDK4I1aGgkoBl5sNNxJt4HTwINo0FQ/tc=")
async def receive_message_from1():
await receiver.open()
print("Opening the Receiver for Topic1")
async with receiver:
while(Receiving):
msgs = await receiver.fetch_next()
for m in msgs:
print("Received the message from topic 1.....")
print(str(m))
await m.complete()
async def receive_message_from2():
await receiver2.open()
print("Opening the Receiver for Topic2")
async with receiver2:
while(Receiving):
msgs = await receiver2.fetch_next()
for m in msgs:
print("Received the message from topic 2.....")
print(str(m))
await m.complete()
loop = asyncio.get_event_loop()
topic1receiver = loop.create_task(receive_message_from1())
topic2receiver = loop.create_task(receive_message_from2())
我创建了两个任务来促进并发。您可以参考 post 以更清楚地了解它们。
输出:
以下是使用最新主要版本 v7 servicebus 的方法
请查看发送和接收订阅消息的异步示例
https://github.com/Azure/azure-sdk-for-python/blob/04290863fa8963ec525a0b2f4079595287e15d93/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py
import os
import asyncio
from aiohttp import ClientSession
from azure.servicebus.aio import ServiceBusClient
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
topic_name = os.environ['SERVICE_BUS_TOPIC_NAME']
subscription_name = os.environ['SERVICE_BUS_SUBSCRIPTION_NAME']
async def watch(topic_name, subscription_name):
async with ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) as servicebus_client:
subscription_receiver = servicebus_client.get_subscription_receiver(
topic_name=topic_name,
subscription_name=subscription_name,
)
async with subscription_receiver:
message = await subscription_receiver.receive_messages(max_wait_time=1)
if message.body is not None:
async with ClientSession() as session:
await session.post('ip:port/endpoint',
headers={'Content-type': 'application/x-www-form-urlencoded'},
data={'data': message.body.decode()})
async def do():
while True:
for topic in ['topic1', 'topic2', 'topic3']:
await watch(topic, 'watcher')
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(do())
我正在尝试使用 async/await
从 Azure ServiceBus 主题读取消息,然后通过 HTTP 将内容转发到另一个应用程序。我的代码很简单:
import asyncio
from aiohttp import ClientSession
from azure.servicebus.aio.async_client import ServiceBusService
bus_service = ServiceBusService(service_namespace=..., shared_access_key_name=..., shared_access_key_value=...)
async def watch(topic_name, subscription_name):
print('{} started'.format(topic_name))
message = bus_service.receive_subscription_message(topic_name, subscription_name, peek_lock=False, timeout=1)
if message.body is not None:
async with ClientSession() as session:
await session.post('ip:port/endpoint',
headers={'Content-type': 'application/x-www-form-urlencoded'},
data={'data': message.body.decode()})
async def do():
while True:
for topic in ['topic1', 'topic2', 'topic3']:
await watch(topic, 'watcher')
if __name__ == "__main__":
asyncio.run(do())
我想(永远)寻找来自各种主题的消息,当消息到达时发送 POST。我从 azure
导入 aio
包,它应该以异步方式工作。经过多次尝试,我得到的唯一解决方案是 while True
并设置 timeout=1
。这不是我想要的,我是按顺序做的。
azure-servicebus
版本 0.50.3
.
这是我第一次使用 async/await
可能我遗漏了什么...
任何 solution/suggestions?
您将必须使用软件包:azure.servicebus.aio
他们有以下异步模块:
我们将不得不使用接收处理程序 class - 它可以用 get_receiver() 方法实例化。使用此对象,您将能够异步地遍历消息。启动一个示例脚本,您可以进一步优化它:
from azure.servicebus.aio import SubscriptionClient
import asyncio
import nest_asyncio
nest_asyncio.apply()
Receiving = True
#Topic 1 receiver :
conn_str= "<>"
name="Allmessages1"
SubsClient = SubscriptionClient.from_connection_string(conn_str, name)
receiver = SubsClient.get_receiver()
#Topic 2 receiver :
conn_str2= "<>"
name2="Allmessages2"
SubsClient2 = SubscriptionClient.from_connection_string(conn_str2, name2)
receiver2 = SubsClient2.get_receiver()
#obj= SubscriptionClient("svijayservicebus","mytopic1", shared_access_key_name="RootManageSharedAccessKey", shared_access_key_value="ySr+maBCmIRDK4I1aGgkoBl5sNNxJt4HTwINo0FQ/tc=")
async def receive_message_from1():
await receiver.open()
print("Opening the Receiver for Topic1")
async with receiver:
while(Receiving):
msgs = await receiver.fetch_next()
for m in msgs:
print("Received the message from topic 1.....")
print(str(m))
await m.complete()
async def receive_message_from2():
await receiver2.open()
print("Opening the Receiver for Topic2")
async with receiver2:
while(Receiving):
msgs = await receiver2.fetch_next()
for m in msgs:
print("Received the message from topic 2.....")
print(str(m))
await m.complete()
loop = asyncio.get_event_loop()
topic1receiver = loop.create_task(receive_message_from1())
topic2receiver = loop.create_task(receive_message_from2())
我创建了两个任务来促进并发。您可以参考
输出:
以下是使用最新主要版本 v7 servicebus 的方法 请查看发送和接收订阅消息的异步示例 https://github.com/Azure/azure-sdk-for-python/blob/04290863fa8963ec525a0b2f4079595287e15d93/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py
import os
import asyncio
from aiohttp import ClientSession
from azure.servicebus.aio import ServiceBusClient
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
topic_name = os.environ['SERVICE_BUS_TOPIC_NAME']
subscription_name = os.environ['SERVICE_BUS_SUBSCRIPTION_NAME']
async def watch(topic_name, subscription_name):
async with ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) as servicebus_client:
subscription_receiver = servicebus_client.get_subscription_receiver(
topic_name=topic_name,
subscription_name=subscription_name,
)
async with subscription_receiver:
message = await subscription_receiver.receive_messages(max_wait_time=1)
if message.body is not None:
async with ClientSession() as session:
await session.post('ip:port/endpoint',
headers={'Content-type': 'application/x-www-form-urlencoded'},
data={'data': message.body.decode()})
async def do():
while True:
for topic in ['topic1', 'topic2', 'topic3']:
await watch(topic, 'watcher')
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(do())