使用 Azure ServiceBus 7.0 SDK,如何使用 ServiceBusManagementClient class 获取对 ServiceBusClient 的引用?

With the Azure ServiceBus 7.0 SDK, how do I get a reference to a ServiceBusClient using a ServiceBusManagementClient class?

我正在使用最新的 Python Azure SDK

azure-mgmt-servicebus==6.0.0
azure-servicebus==7.0.0

我像这样获得对 ServiceBusManagementClient 的引用...

def _get_sb_client(self):
    credential = ClientSecretCredential(
       tenant_id=self._tenant,
        client_id=self._client_id,
        client_secret=self._client_secret
    )

    return ServiceBusManagementClient(
        credential, self._subscription)

然而,根据这个——https://docs.microsoft.com/en-us/python/api/overview/azure/servicebus?view=azure-python#service-bus-topics-and-subscriptions,为了发送一个主题的消息,

topic_client = sb_client.get_topic("MyTopic")

我需要一个 azure.servicebus.ServiceBusClient 对象的引用。但是,除了通过我们未使用的连接字符串之外,文档没有给出关于如何生成此类实例的任何解释。如何使用 ClientSecretCredential 或 ServiceBusManagementClient 实例获取 ServiceBusClient 实例?

我会用一些东西来回应,如果有任何需要澄清或有偏差的话,请告诉我:

  1. 您 link 编辑的文档不是针对 7.0.0 的,而是针对 0.50.3 的,这可能解释了一些混淆。我建议使用植根于 readme for which one of the comparable docs to the one you referenced is here 的文档。 (替换历史文档的过程是一个已知问题并正在解决,以 python SDK 组件的维护者的身份全面披露)
  2. 我会指出您初始化了 ServiceBusManagementClient(将用于 creating/deleting 主题)而不是 ServiceBusClient(将用于发送和接收。)请参阅我列出的文档 above 示例如何发送到主题。我会在这个 post 的底部粘贴一个 link 的综合示例,以防万一将来发生任何变化,这个 post 仍然适用于 post版本 7.0.0
  3. 对于使用 Azure 身份凭证进行身份验证,我会向您指出自述文件 here and the sample link 中的文档,并粘贴在下面。

向 topic/subscription

发送和接收的示例
from azure.servicebus import ServiceBusClient, ServiceBusMessage

CONNECTION_STR = "<NAMESPACE CONNECTION STRING>"
TOPIC_NAME = "<TOPIC NAME>"
SUBSCRIPTION_NAME = "<SUBSCRIPTION NAME>"

def send_single_message(sender):
    message = ServiceBusMessage("Single Message")
    sender.send_messages(message)
    print("Sent a single message")

def send_a_list_of_messages(sender):
    messages = [ServiceBusMessage("Message in list") for _ in range(5)]
    sender.send_messages(messages)
    print("Sent a list of 5 messages")

def send_batch_message(sender):
    batch_message = sender.create_message_batch()
    for _ in range(10):
        try:
            batch_message.add_message(ServiceBusMessage("Message inside a ServiceBusMessageBatch"))
        except ValueError:
            # ServiceBusMessageBatch object reaches max_size.
            # New ServiceBusMessageBatch object can be created here to send more data.
            break
    sender.send_messages(batch_message)
    print("Sent a batch of 10 messages")

servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)

with servicebus_client:
    sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME)
    with sender:
        send_single_message(sender)
        send_a_list_of_messages(sender)
        send_batch_message(sender)

print("Done sending messages")
print("-----------------------")

with servicebus_client:
    receiver = servicebus_client.get_subscription_receiver(topic_name=TOPIC_NAME, subscription_name=SUBSCRIPTION_NAME, max_wait_time=5)
    with receiver:
        for msg in receiver:
            print("Received: " + str(msg))
            receiver.complete_message(msg)

使用 Azure Identity 进行身份验证的示例

import os
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.identity import EnvironmentCredential

FULLY_QUALIFIED_NAMESPACE = os.environ['SERVICE_BUS_NAMESPACE']
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]

credential = EnvironmentCredential()

servicebus_client = ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential)
with servicebus_client:
    sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
    with sender:
        sender.send_messages(ServiceBusMessage('Single Message'))