从 python 管理 Azure 中的事件中心

Managing event hubs in Azure from python

我正在尝试编写 python 代码,以便能够从 python 脚本创建/删除 Azure 事件中心上的事件中心。我已经设法在 the documentation on this page. I believe I now need to use the EventHubsOperations Class as documented here.

之后创建了一个 EventHubManagementClient

我有 2 个挑战:

  1. “from aaaa import EventHubsOperations”行中的 'aaaa' 是什么,以便能够引用 class?我似乎找不到如何调用相应的包来导入 class...
  2. 使用 class 时需要为配置、序列化器和反序列化器传递什么值?也许有人可以分享一个如何使用这个的例子 class?

理想情况下,我希望调用 create_or_delete 方法来创建新的事件中心或从 python 脚本中删除现有的事件中心。如果有人可以分享应该如何扩展这段代码来实现这一点,我将不胜感激。文档看起来非常简单:“配置,必需,服务客户端配置”...

我的代码如下:

import setenv
import os
from azure.mgmt.eventhub import EventHubManagementClient
from azure.identity import DefaultAzureCredential

setenv.import_env_vars('')

vault_url = os.environ["KEY_VAULT_URL"]
subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"]

credential = DefaultAzureCredential()

print('Creating EH_client...')
EH_client = EventHubManagementClient(vault_url, credential, subscription_id, base_url=None)
print('Created.')

EventHubsOperations(EH_client)

结果输出如下:

Project root: 
filename: env_values
Creating EH_client...
Created.
Traceback (most recent call last):
  File "/home/db533/gitRepos/GunaBot2/azure-mgmt/azure_test.py", line 25, in <module>
    EventHubsOperations(EH_client)
NameError: name 'EventHubsOperations' is not defined

Process finished with exit code 1

使用以下语句导入 EventHubsOperation class:

from azure.mgmt.eventhub.v2021_01_01_preview.operations import EventHubsOperations

config为服务客户端配置详情

serializerdeserializer 对象是有助于将对象转储和加载到字节流中的对象,例如泡菜模块。 Link了解更多。

准备好这些参数后,您需要为 EventHubsOperations 创建对象class 传递所有参数的值。

object_name = EventHubsOperations(client=(value), config=(value), serializer=(value), deserializer=(value))

使用此对象,您可以使用此 class 的 create_or_updatedelete 方法以及必需的参数。

object_name.create_or_update(resource_group_name, namespace_name, event_hub_name, parameters, **kwargs)

object_name.delete(resource_group_name, namespace_name, event_hub_name, **kwargs)

您还可以发现Source code for azure.mgmt.eventhub.v2018_01_01_preview.operations.event)

这是我的代码,用于从 python.

创建和删除事件中心

我使用单独的脚本 (setenv.py) 加载存储在文本文件中的环境变量。

import os
import setenv
from azure.mgmt.eventhub import EventHubManagementClient
from azure.mgmt.resource import ResourceManagementClient
from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.storage import StorageManagementClient
from azure.mgmt.storage.models import (StorageAccountCreateParameters,Sku,SkuName,Kind)

set_env_path="C:\Users\db533\PycharmProjects\GunaBot2\shared_files\"
setenv.import_env_vars(set_env_path,'env_values')

def main():
    SUBSCRIPTION_ID = os.environ.get("AZURE_SUBSCRIPTION_ID", None)
    GROUP_NAME = "annabot-eventhub2"
    STORAGE_ACCOUNT_NAME = "storageaccountxyztest"
    NAMESPACE_NAME = "annabot-eventhub999"
    EVENTHUB_NAME = "worker99901"

    tenant_id = os.environ["AZURE_TENANT_ID"]
    client_id = os.environ["AZURE_CLIENT_ID"]
    client_secret = os.environ["AZURE_CLIENT_SECRET"]
    print('AZURE_CLIENT_SECRET:',client_secret)

    credential_common = ServicePrincipalCredentials(tenant=tenant_id, client_id=client_id, secret=client_secret)

    # Create client
    print(" Create resource client...")
    resource_client = ResourceManagementClient(credential_common, SUBSCRIPTION_ID)

    print(" Create Event hub client...")
    eventhub_client = EventHubManagementClient(credential_common,SUBSCRIPTION_ID)

    print(" Create storage client...")
    storage_client = StorageManagementClient(credential_common,SUBSCRIPTION_ID)

    # Create resource group
    print(" Create resource group...")
    resource_client.resource_groups.create_or_update(
        GROUP_NAME,
        {"location": "germanywestcentral"}
    )

    # Create StorageAccount
    print(" Create storageAccount...")
    storage_async_operation = storage_client.storage_accounts.create(
        GROUP_NAME,
        STORAGE_ACCOUNT_NAME,
        StorageAccountCreateParameters(
            sku=Sku(name=SkuName.standard_lrs),
            kind=Kind.storage_v2,
            location='germanywestcentral'
        )
    )
    storage_account = storage_async_operation.result()

    # Create Namespace
    print(" Create event hub namespace...")
    eventhub_client.namespaces.create_or_update(
        GROUP_NAME,
        NAMESPACE_NAME,
        {
          "sku": {
            "name": "Standard",
            "tier": "Standard"
          },
          "location": "Germany West Central",
          "tags": {
            "tag1": "value1",
            "tag2": "value2"
          },
          "kafka_enabled": "True"
        }
    ).result()

    # Create EventHub
    print(" Create event hub...")
    eventhub = eventhub_client.event_hubs.create_or_update(
        GROUP_NAME,
        NAMESPACE_NAME,
        EVENTHUB_NAME,
        {
          "message_retention_in_days": "4",
          "partition_count": "4",
          "status": "Active",
          "capture_description": {
            "enabled": True,
            "encoding": "Avro",
            "interval_in_seconds": "120",
            "size_limit_in_bytes": "10485763",
            "destination": {
              "name": "EventHubArchive.AzureBlockBlob",
              "storage_account_resource_id": "/subscriptions/" + SUBSCRIPTION_ID + "/resourceGroups/" + GROUP_NAME + "/providers/Microsoft.Storage/storageAccounts/" + STORAGE_ACCOUNT_NAME + "",
              "blob_container": "container",
              "archive_name_format": "{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}"
            }
          }
        }
    )
    print("Created EventHub: {}".format(eventhub))

    # Get EventHub
    eventhub = eventhub_client.event_hubs.get(
        GROUP_NAME,
        NAMESPACE_NAME,
        EVENTHUB_NAME
    )
    print("get() for EventHub: {}\n".format(eventhub))

    #Create authorisation rule
    eventhub_rule = eventhub_client.event_hubs.create_or_update_authorization_rule(
        GROUP_NAME,
        NAMESPACE_NAME,
        event_hub_name=EVENTHUB_NAME,
        authorization_rule_name="manager",
        rights=["LISTEN","SEND"]
    )
    print("create_or_update_authorization_rule() for Manager for EventHub: {}\n".format(eventhub_rule))

    # Get authorisation rule
    eventhub_rule2 = eventhub_client.event_hubs.get_authorization_rule(
        GROUP_NAME,
        NAMESPACE_NAME,
        event_hub_name=EVENTHUB_NAME,
        authorization_rule_name="manager"
    )
    print("get_authorization_rule() for manager for EventHub: {}\n".format(eventhub_rule2))

    # List keys
    namespace_keys = eventhub_client.event_hubs.list_keys(
        GROUP_NAME,
        NAMESPACE_NAME,
        event_hub_name=EVENTHUB_NAME,
        authorization_rule_name="manager"
    )
    print("list_keys() for EventHub: {}\n".format(namespace_keys))
    print("namespace_keys.primary_connection_string:",namespace_keys.primary_connection_string)

# Delete EventHub
    eventhub_client.event_hubs.delete(
        GROUP_NAME,
        NAMESPACE_NAME,
        EVENTHUB_NAME
    )
    print("Delete EventHub.")

    # Delete Namespace
    eventhub_client.namespaces.delete(
        GROUP_NAME,
        NAMESPACE_NAME
    ).result()

    # Delete StorageAccount
    storage_client.storage_accounts.delete(
        GROUP_NAME,
        STORAGE_ACCOUNT_NAME
    )

    # Delete resource group
    resource_client.resource_groups.delete(
        GROUP_NAME
    ).result()


if __name__ == "__main__":
    main()

加载环境变量的setenv.py脚本如下。 (我从另一个答案中得到这个。不能以此为荣......):

import os

def import_env_vars(env_folder,env_filename):
    """Imports some environment variables from a special .env file in the
    project root directory.
    """
    print("env_folder:",env_folder)
    if len(env_folder) > 0 and env_folder[-1] != '\':
        env_folder += '\'
    try:
        print("filename:",env_folder+env_filename)
        envfile = open(env_folder+env_filename, "r")

    except IOError:
        raise Exception("You must have a {0} file in your project root "
                        "in order to run the server in your local machine. "
                        "This specifies some necessary environment variables. ")
    for line in envfile.readlines():
        [key,value] = line.strip().split("=")
        os.environ[key] = value
        print("key:",key)
        print("value:", value)

环境变量在文件中定义如下:

EVENTHUB_SERVER=gunabot-eventhub.servicebus.windows.net
DEV_STAGE=Dev
AZURE_SUBSCRIPTION_ID=xxxxxxxxx-xxxx-xxxxxxx-xxxxx-xxxx
AZURE_TENANT_ID=yyyyyyyyy-yyyyy-yyyyyy-yyyyyy
AZURE_CLIENT_ID=zzzzzz-zzzzzz-zzzzzz-zzzzzzz-zzz
AZURE_CLIENT_SECRET=qqqqq-qqqq-qqqqqqq-qqqqq-qqqqq

希望这对其他人有帮助。