从 python 管理 Azure 中的事件中心
Managing event hubs in Azure from python
我正在尝试编写 python 代码,以便能够从 python 脚本创建/删除 Azure 事件中心上的事件中心。我已经设法在 the documentation on this page. I believe I now need to use the EventHubsOperations Class as documented here.
之后创建了一个 EventHubManagementClient
我有 2 个挑战:
- “from aaaa import EventHubsOperations”行中的 'aaaa' 是什么,以便能够引用 class?我似乎找不到如何调用相应的包来导入 class...
- 使用 class 时需要为配置、序列化器和反序列化器传递什么值?也许有人可以分享一个如何使用这个的例子 class?
理想情况下,我希望调用 create_or_delete 方法来创建新的事件中心或从 python 脚本中删除现有的事件中心。如果有人可以分享应该如何扩展这段代码来实现这一点,我将不胜感激。文档看起来非常简单:“配置,必需,服务客户端配置”...
我的代码如下:
import setenv
import os
from azure.mgmt.eventhub import EventHubManagementClient
from azure.identity import DefaultAzureCredential
setenv.import_env_vars('')
vault_url = os.environ["KEY_VAULT_URL"]
subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"]
credential = DefaultAzureCredential()
print('Creating EH_client...')
EH_client = EventHubManagementClient(vault_url, credential, subscription_id, base_url=None)
print('Created.')
EventHubsOperations(EH_client)
结果输出如下:
Project root:
filename: env_values
Creating EH_client...
Created.
Traceback (most recent call last):
File "/home/db533/gitRepos/GunaBot2/azure-mgmt/azure_test.py", line 25, in <module>
EventHubsOperations(EH_client)
NameError: name 'EventHubsOperations' is not defined
Process finished with exit code 1
使用以下语句导入 EventHubsOperation class:
from azure.mgmt.eventhub.v2021_01_01_preview.operations import EventHubsOperations
config
为服务客户端配置详情
serializer
和 deserializer
对象是有助于将对象转储和加载到字节流中的对象,例如泡菜模块。 Link了解更多。
准备好这些参数后,您需要为 EventHubsOperations 创建对象class 传递所有参数的值。
object_name = EventHubsOperations(client=(value), config=(value), serializer=(value), deserializer=(value))
使用此对象,您可以使用此 class 的 create_or_update
和 delete
方法以及必需的参数。
object_name.create_or_update(resource_group_name, namespace_name, event_hub_name, parameters, **kwargs)
object_name.delete(resource_group_name, namespace_name, event_hub_name, **kwargs)
您还可以发现Source code for azure.mgmt.eventhub.v2018_01_01_preview.operations.event)。
这是我的代码,用于从 python.
创建和删除事件中心
我使用单独的脚本 (setenv.py) 加载存储在文本文件中的环境变量。
import os
import setenv
from azure.mgmt.eventhub import EventHubManagementClient
from azure.mgmt.resource import ResourceManagementClient
from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.storage import StorageManagementClient
from azure.mgmt.storage.models import (StorageAccountCreateParameters,Sku,SkuName,Kind)
set_env_path="C:\Users\db533\PycharmProjects\GunaBot2\shared_files\"
setenv.import_env_vars(set_env_path,'env_values')
def main():
SUBSCRIPTION_ID = os.environ.get("AZURE_SUBSCRIPTION_ID", None)
GROUP_NAME = "annabot-eventhub2"
STORAGE_ACCOUNT_NAME = "storageaccountxyztest"
NAMESPACE_NAME = "annabot-eventhub999"
EVENTHUB_NAME = "worker99901"
tenant_id = os.environ["AZURE_TENANT_ID"]
client_id = os.environ["AZURE_CLIENT_ID"]
client_secret = os.environ["AZURE_CLIENT_SECRET"]
print('AZURE_CLIENT_SECRET:',client_secret)
credential_common = ServicePrincipalCredentials(tenant=tenant_id, client_id=client_id, secret=client_secret)
# Create client
print(" Create resource client...")
resource_client = ResourceManagementClient(credential_common, SUBSCRIPTION_ID)
print(" Create Event hub client...")
eventhub_client = EventHubManagementClient(credential_common,SUBSCRIPTION_ID)
print(" Create storage client...")
storage_client = StorageManagementClient(credential_common,SUBSCRIPTION_ID)
# Create resource group
print(" Create resource group...")
resource_client.resource_groups.create_or_update(
GROUP_NAME,
{"location": "germanywestcentral"}
)
# Create StorageAccount
print(" Create storageAccount...")
storage_async_operation = storage_client.storage_accounts.create(
GROUP_NAME,
STORAGE_ACCOUNT_NAME,
StorageAccountCreateParameters(
sku=Sku(name=SkuName.standard_lrs),
kind=Kind.storage_v2,
location='germanywestcentral'
)
)
storage_account = storage_async_operation.result()
# Create Namespace
print(" Create event hub namespace...")
eventhub_client.namespaces.create_or_update(
GROUP_NAME,
NAMESPACE_NAME,
{
"sku": {
"name": "Standard",
"tier": "Standard"
},
"location": "Germany West Central",
"tags": {
"tag1": "value1",
"tag2": "value2"
},
"kafka_enabled": "True"
}
).result()
# Create EventHub
print(" Create event hub...")
eventhub = eventhub_client.event_hubs.create_or_update(
GROUP_NAME,
NAMESPACE_NAME,
EVENTHUB_NAME,
{
"message_retention_in_days": "4",
"partition_count": "4",
"status": "Active",
"capture_description": {
"enabled": True,
"encoding": "Avro",
"interval_in_seconds": "120",
"size_limit_in_bytes": "10485763",
"destination": {
"name": "EventHubArchive.AzureBlockBlob",
"storage_account_resource_id": "/subscriptions/" + SUBSCRIPTION_ID + "/resourceGroups/" + GROUP_NAME + "/providers/Microsoft.Storage/storageAccounts/" + STORAGE_ACCOUNT_NAME + "",
"blob_container": "container",
"archive_name_format": "{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}"
}
}
}
)
print("Created EventHub: {}".format(eventhub))
# Get EventHub
eventhub = eventhub_client.event_hubs.get(
GROUP_NAME,
NAMESPACE_NAME,
EVENTHUB_NAME
)
print("get() for EventHub: {}\n".format(eventhub))
#Create authorisation rule
eventhub_rule = eventhub_client.event_hubs.create_or_update_authorization_rule(
GROUP_NAME,
NAMESPACE_NAME,
event_hub_name=EVENTHUB_NAME,
authorization_rule_name="manager",
rights=["LISTEN","SEND"]
)
print("create_or_update_authorization_rule() for Manager for EventHub: {}\n".format(eventhub_rule))
# Get authorisation rule
eventhub_rule2 = eventhub_client.event_hubs.get_authorization_rule(
GROUP_NAME,
NAMESPACE_NAME,
event_hub_name=EVENTHUB_NAME,
authorization_rule_name="manager"
)
print("get_authorization_rule() for manager for EventHub: {}\n".format(eventhub_rule2))
# List keys
namespace_keys = eventhub_client.event_hubs.list_keys(
GROUP_NAME,
NAMESPACE_NAME,
event_hub_name=EVENTHUB_NAME,
authorization_rule_name="manager"
)
print("list_keys() for EventHub: {}\n".format(namespace_keys))
print("namespace_keys.primary_connection_string:",namespace_keys.primary_connection_string)
# Delete EventHub
eventhub_client.event_hubs.delete(
GROUP_NAME,
NAMESPACE_NAME,
EVENTHUB_NAME
)
print("Delete EventHub.")
# Delete Namespace
eventhub_client.namespaces.delete(
GROUP_NAME,
NAMESPACE_NAME
).result()
# Delete StorageAccount
storage_client.storage_accounts.delete(
GROUP_NAME,
STORAGE_ACCOUNT_NAME
)
# Delete resource group
resource_client.resource_groups.delete(
GROUP_NAME
).result()
if __name__ == "__main__":
main()
加载环境变量的setenv.py脚本如下。 (我从另一个答案中得到这个。不能以此为荣......):
import os
def import_env_vars(env_folder,env_filename):
"""Imports some environment variables from a special .env file in the
project root directory.
"""
print("env_folder:",env_folder)
if len(env_folder) > 0 and env_folder[-1] != '\':
env_folder += '\'
try:
print("filename:",env_folder+env_filename)
envfile = open(env_folder+env_filename, "r")
except IOError:
raise Exception("You must have a {0} file in your project root "
"in order to run the server in your local machine. "
"This specifies some necessary environment variables. ")
for line in envfile.readlines():
[key,value] = line.strip().split("=")
os.environ[key] = value
print("key:",key)
print("value:", value)
环境变量在文件中定义如下:
EVENTHUB_SERVER=gunabot-eventhub.servicebus.windows.net
DEV_STAGE=Dev
AZURE_SUBSCRIPTION_ID=xxxxxxxxx-xxxx-xxxxxxx-xxxxx-xxxx
AZURE_TENANT_ID=yyyyyyyyy-yyyyy-yyyyyy-yyyyyy
AZURE_CLIENT_ID=zzzzzz-zzzzzz-zzzzzz-zzzzzzz-zzz
AZURE_CLIENT_SECRET=qqqqq-qqqq-qqqqqqq-qqqqq-qqqqq
希望这对其他人有帮助。
我正在尝试编写 python 代码,以便能够从 python 脚本创建/删除 Azure 事件中心上的事件中心。我已经设法在 the documentation on this page. I believe I now need to use the EventHubsOperations Class as documented here.
之后创建了一个 EventHubManagementClient我有 2 个挑战:
- “from aaaa import EventHubsOperations”行中的 'aaaa' 是什么,以便能够引用 class?我似乎找不到如何调用相应的包来导入 class...
- 使用 class 时需要为配置、序列化器和反序列化器传递什么值?也许有人可以分享一个如何使用这个的例子 class?
理想情况下,我希望调用 create_or_delete 方法来创建新的事件中心或从 python 脚本中删除现有的事件中心。如果有人可以分享应该如何扩展这段代码来实现这一点,我将不胜感激。文档看起来非常简单:“配置,必需,服务客户端配置”...
我的代码如下:
import setenv
import os
from azure.mgmt.eventhub import EventHubManagementClient
from azure.identity import DefaultAzureCredential
setenv.import_env_vars('')
vault_url = os.environ["KEY_VAULT_URL"]
subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"]
credential = DefaultAzureCredential()
print('Creating EH_client...')
EH_client = EventHubManagementClient(vault_url, credential, subscription_id, base_url=None)
print('Created.')
EventHubsOperations(EH_client)
结果输出如下:
Project root:
filename: env_values
Creating EH_client...
Created.
Traceback (most recent call last):
File "/home/db533/gitRepos/GunaBot2/azure-mgmt/azure_test.py", line 25, in <module>
EventHubsOperations(EH_client)
NameError: name 'EventHubsOperations' is not defined
Process finished with exit code 1
使用以下语句导入 EventHubsOperation class:
from azure.mgmt.eventhub.v2021_01_01_preview.operations import EventHubsOperations
config
为服务客户端配置详情
serializer
和 deserializer
对象是有助于将对象转储和加载到字节流中的对象,例如泡菜模块。 Link了解更多。
准备好这些参数后,您需要为 EventHubsOperations 创建对象class 传递所有参数的值。
object_name = EventHubsOperations(client=(value), config=(value), serializer=(value), deserializer=(value))
使用此对象,您可以使用此 class 的 create_or_update
和 delete
方法以及必需的参数。
object_name.create_or_update(resource_group_name, namespace_name, event_hub_name, parameters, **kwargs)
object_name.delete(resource_group_name, namespace_name, event_hub_name, **kwargs)
您还可以发现Source code for azure.mgmt.eventhub.v2018_01_01_preview.operations.event)。
这是我的代码,用于从 python.
创建和删除事件中心我使用单独的脚本 (setenv.py) 加载存储在文本文件中的环境变量。
import os
import setenv
from azure.mgmt.eventhub import EventHubManagementClient
from azure.mgmt.resource import ResourceManagementClient
from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.storage import StorageManagementClient
from azure.mgmt.storage.models import (StorageAccountCreateParameters,Sku,SkuName,Kind)
set_env_path="C:\Users\db533\PycharmProjects\GunaBot2\shared_files\"
setenv.import_env_vars(set_env_path,'env_values')
def main():
SUBSCRIPTION_ID = os.environ.get("AZURE_SUBSCRIPTION_ID", None)
GROUP_NAME = "annabot-eventhub2"
STORAGE_ACCOUNT_NAME = "storageaccountxyztest"
NAMESPACE_NAME = "annabot-eventhub999"
EVENTHUB_NAME = "worker99901"
tenant_id = os.environ["AZURE_TENANT_ID"]
client_id = os.environ["AZURE_CLIENT_ID"]
client_secret = os.environ["AZURE_CLIENT_SECRET"]
print('AZURE_CLIENT_SECRET:',client_secret)
credential_common = ServicePrincipalCredentials(tenant=tenant_id, client_id=client_id, secret=client_secret)
# Create client
print(" Create resource client...")
resource_client = ResourceManagementClient(credential_common, SUBSCRIPTION_ID)
print(" Create Event hub client...")
eventhub_client = EventHubManagementClient(credential_common,SUBSCRIPTION_ID)
print(" Create storage client...")
storage_client = StorageManagementClient(credential_common,SUBSCRIPTION_ID)
# Create resource group
print(" Create resource group...")
resource_client.resource_groups.create_or_update(
GROUP_NAME,
{"location": "germanywestcentral"}
)
# Create StorageAccount
print(" Create storageAccount...")
storage_async_operation = storage_client.storage_accounts.create(
GROUP_NAME,
STORAGE_ACCOUNT_NAME,
StorageAccountCreateParameters(
sku=Sku(name=SkuName.standard_lrs),
kind=Kind.storage_v2,
location='germanywestcentral'
)
)
storage_account = storage_async_operation.result()
# Create Namespace
print(" Create event hub namespace...")
eventhub_client.namespaces.create_or_update(
GROUP_NAME,
NAMESPACE_NAME,
{
"sku": {
"name": "Standard",
"tier": "Standard"
},
"location": "Germany West Central",
"tags": {
"tag1": "value1",
"tag2": "value2"
},
"kafka_enabled": "True"
}
).result()
# Create EventHub
print(" Create event hub...")
eventhub = eventhub_client.event_hubs.create_or_update(
GROUP_NAME,
NAMESPACE_NAME,
EVENTHUB_NAME,
{
"message_retention_in_days": "4",
"partition_count": "4",
"status": "Active",
"capture_description": {
"enabled": True,
"encoding": "Avro",
"interval_in_seconds": "120",
"size_limit_in_bytes": "10485763",
"destination": {
"name": "EventHubArchive.AzureBlockBlob",
"storage_account_resource_id": "/subscriptions/" + SUBSCRIPTION_ID + "/resourceGroups/" + GROUP_NAME + "/providers/Microsoft.Storage/storageAccounts/" + STORAGE_ACCOUNT_NAME + "",
"blob_container": "container",
"archive_name_format": "{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}"
}
}
}
)
print("Created EventHub: {}".format(eventhub))
# Get EventHub
eventhub = eventhub_client.event_hubs.get(
GROUP_NAME,
NAMESPACE_NAME,
EVENTHUB_NAME
)
print("get() for EventHub: {}\n".format(eventhub))
#Create authorisation rule
eventhub_rule = eventhub_client.event_hubs.create_or_update_authorization_rule(
GROUP_NAME,
NAMESPACE_NAME,
event_hub_name=EVENTHUB_NAME,
authorization_rule_name="manager",
rights=["LISTEN","SEND"]
)
print("create_or_update_authorization_rule() for Manager for EventHub: {}\n".format(eventhub_rule))
# Get authorisation rule
eventhub_rule2 = eventhub_client.event_hubs.get_authorization_rule(
GROUP_NAME,
NAMESPACE_NAME,
event_hub_name=EVENTHUB_NAME,
authorization_rule_name="manager"
)
print("get_authorization_rule() for manager for EventHub: {}\n".format(eventhub_rule2))
# List keys
namespace_keys = eventhub_client.event_hubs.list_keys(
GROUP_NAME,
NAMESPACE_NAME,
event_hub_name=EVENTHUB_NAME,
authorization_rule_name="manager"
)
print("list_keys() for EventHub: {}\n".format(namespace_keys))
print("namespace_keys.primary_connection_string:",namespace_keys.primary_connection_string)
# Delete EventHub
eventhub_client.event_hubs.delete(
GROUP_NAME,
NAMESPACE_NAME,
EVENTHUB_NAME
)
print("Delete EventHub.")
# Delete Namespace
eventhub_client.namespaces.delete(
GROUP_NAME,
NAMESPACE_NAME
).result()
# Delete StorageAccount
storage_client.storage_accounts.delete(
GROUP_NAME,
STORAGE_ACCOUNT_NAME
)
# Delete resource group
resource_client.resource_groups.delete(
GROUP_NAME
).result()
if __name__ == "__main__":
main()
加载环境变量的setenv.py脚本如下。 (我从另一个答案中得到这个。不能以此为荣......):
import os
def import_env_vars(env_folder,env_filename):
"""Imports some environment variables from a special .env file in the
project root directory.
"""
print("env_folder:",env_folder)
if len(env_folder) > 0 and env_folder[-1] != '\':
env_folder += '\'
try:
print("filename:",env_folder+env_filename)
envfile = open(env_folder+env_filename, "r")
except IOError:
raise Exception("You must have a {0} file in your project root "
"in order to run the server in your local machine. "
"This specifies some necessary environment variables. ")
for line in envfile.readlines():
[key,value] = line.strip().split("=")
os.environ[key] = value
print("key:",key)
print("value:", value)
环境变量在文件中定义如下:
EVENTHUB_SERVER=gunabot-eventhub.servicebus.windows.net
DEV_STAGE=Dev
AZURE_SUBSCRIPTION_ID=xxxxxxxxx-xxxx-xxxxxxx-xxxxx-xxxx
AZURE_TENANT_ID=yyyyyyyyy-yyyyy-yyyyyy-yyyyyy
AZURE_CLIENT_ID=zzzzzz-zzzzzz-zzzzzz-zzzzzzz-zzz
AZURE_CLIENT_SECRET=qqqqq-qqqq-qqqqqqq-qqqqq-qqqqq
希望这对其他人有帮助。