如何在 Azure 事件中心创建多个接收者以避免重复?
How to create multiple receivers in Azure Eventhub to avoide duplicates?
我有一个生产者正在将事件发送到事件中心。
我想创建 2 个接收器来接收来自 eventhub 的事件。如何实现。
接收者代码:
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.eventhub import EventHubSharedKeyCredential, EventData, EventHubConsumerClient
from azure.core.exceptions import ResourceExistsError
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
#Eventhub access credentials
connection_str = ****
consumer_group = '$Default'
eventhub_name = ****
#Blobstorage Storage credentials
storage_connection_str = ****
container_name = ****
storageAccount = ****
#For checkpointing in Blob storage
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
#Initiate BlobServiceClient to access the Blob storage
blob_service_client = BlobServiceClient.from_connection_string(storage_connection_str)
container_client = blob_service_client.get_container_client('container_name') #Dump final data to the Blob storage in append mode.
try:
container_client.create_container() #Create new Container in the service
properties = container_client.get_container_properties()
except ResourceExistsError:
print("Container already exists.")
#Instantiate a new BlobClient
#blob_client = container_client.get_blob_client("data.csv")
def get_messages():
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
def on_event_batch(partition_context, events):
#log.info("Partition {}, Received count: {}".format(partition_context.partition_id, len(events)))
print("Received event from partition {}".format(
partition_context.partition_id)) # Since no partition is defined so partition = 0 by default.
if (len(events) == 0):
client.close() # closing the client if there is no event triggered.
else:
for event in events:
list_ = event.body_as_json()
# Update checkpoint
partition_context.update_checkpoint()
try:
with client:
client.receive_batch(
on_event_batch=on_event_batch,
PARTITION="0",)
#starting_position="-1", ) # "-1" is from the beginning of the partition.
except KeyboardInterrupt:
print('Stopped receiving.')
get_messages()
我创建了此代码的 2 个副本,名称分别为 consumer1.py 和 consumer2.py。但是这两个消费者每次都会收到相同的事件。
例如,我发送了 100 个事件,然后我希望这两个消费者并行 运行 并将这 100 个事件分开,避免重复。如何实现?
所以最后我找到了在同一个消费者组下创建多个消费者的解决方案,这些消费者可以并行消费事件并且应该在彼此之间分担负载。
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.eventhub import EventHubSharedKeyCredential, EventData, EventHubConsumerClient
from azure.core.exceptions import ResourceExistsError
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
#Eventhub access credentials
connection_str = ****
consumer_group = '$Default'
eventhub_name = ****
#Blobstorage Storage credentials
storage_connection_str = ****
container_name = ****
storageAccount = ****
#For checkpointing in Blob storage
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
#Initiate BlobServiceClient to access the Blob storage
blob_service_client = BlobServiceClient.from_connection_string(storage_connection_str)
container_client = blob_service_client.get_container_client('nsc-container')
#Dump final data to the Blob storage in append mode.
try:
container_client.create_container() #Create new Container in the service
properties = container_client.get_container_properties()
except ResourceExistsError:
print("Container already exists.")
#Instantiate a new BlobClient
#blob_client = container_client.get_blob_client("data.csv")
def get_messages():
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name, checkpoint_store=checkpoint_store,)
def on_event_batch(partition_context, events):
#log.info("Partition {}, Received count: {}".format(partition_context.partition_id, len(events)))
print("Received event from partition {}".format(
partition_context.partition_id)) # Since no partition is defined so partition = 0 by default.
line_count = 0
start_time = time.time()
cnt = 0
if (len(events) == 0):
client.close() # closing the client if there is no event triggered.
else:
for event in events:
list_ = event.body_as_json()
cnt += 1
# Update checkpoint
partition_context.update_checkpoint()
print("Number of events received: ",cnt)
line_count = line_count+ cnt
end_time = time.time()
run_time = end_time - start_time
print("\nTotal Received {} records in {} seconds.".format(line_count, run_time))
try:
with client:
client.receive_batch(
on_event_batch=on_event_batch,) # With specified partition_id, load-balance will be disabled
except KeyboardInterrupt:
print('Stopped receiving.')
get_messages()
现在创建尽可能多的代码副本并将它们保存为 consumer_1.py 等等。此外,确保分区数与消费者数相等以获得最佳效率。
我有一个生产者正在将事件发送到事件中心。 我想创建 2 个接收器来接收来自 eventhub 的事件。如何实现。
接收者代码:
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.eventhub import EventHubSharedKeyCredential, EventData, EventHubConsumerClient
from azure.core.exceptions import ResourceExistsError
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
#Eventhub access credentials
connection_str = ****
consumer_group = '$Default'
eventhub_name = ****
#Blobstorage Storage credentials
storage_connection_str = ****
container_name = ****
storageAccount = ****
#For checkpointing in Blob storage
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
#Initiate BlobServiceClient to access the Blob storage
blob_service_client = BlobServiceClient.from_connection_string(storage_connection_str)
container_client = blob_service_client.get_container_client('container_name') #Dump final data to the Blob storage in append mode.
try:
container_client.create_container() #Create new Container in the service
properties = container_client.get_container_properties()
except ResourceExistsError:
print("Container already exists.")
#Instantiate a new BlobClient
#blob_client = container_client.get_blob_client("data.csv")
def get_messages():
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
def on_event_batch(partition_context, events):
#log.info("Partition {}, Received count: {}".format(partition_context.partition_id, len(events)))
print("Received event from partition {}".format(
partition_context.partition_id)) # Since no partition is defined so partition = 0 by default.
if (len(events) == 0):
client.close() # closing the client if there is no event triggered.
else:
for event in events:
list_ = event.body_as_json()
# Update checkpoint
partition_context.update_checkpoint()
try:
with client:
client.receive_batch(
on_event_batch=on_event_batch,
PARTITION="0",)
#starting_position="-1", ) # "-1" is from the beginning of the partition.
except KeyboardInterrupt:
print('Stopped receiving.')
get_messages()
我创建了此代码的 2 个副本,名称分别为 consumer1.py 和 consumer2.py。但是这两个消费者每次都会收到相同的事件。
例如,我发送了 100 个事件,然后我希望这两个消费者并行 运行 并将这 100 个事件分开,避免重复。如何实现?
所以最后我找到了在同一个消费者组下创建多个消费者的解决方案,这些消费者可以并行消费事件并且应该在彼此之间分担负载。
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.eventhub import EventHubSharedKeyCredential, EventData, EventHubConsumerClient
from azure.core.exceptions import ResourceExistsError
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
#Eventhub access credentials
connection_str = ****
consumer_group = '$Default'
eventhub_name = ****
#Blobstorage Storage credentials
storage_connection_str = ****
container_name = ****
storageAccount = ****
#For checkpointing in Blob storage
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
#Initiate BlobServiceClient to access the Blob storage
blob_service_client = BlobServiceClient.from_connection_string(storage_connection_str)
container_client = blob_service_client.get_container_client('nsc-container')
#Dump final data to the Blob storage in append mode.
try:
container_client.create_container() #Create new Container in the service
properties = container_client.get_container_properties()
except ResourceExistsError:
print("Container already exists.")
#Instantiate a new BlobClient
#blob_client = container_client.get_blob_client("data.csv")
def get_messages():
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name, checkpoint_store=checkpoint_store,)
def on_event_batch(partition_context, events):
#log.info("Partition {}, Received count: {}".format(partition_context.partition_id, len(events)))
print("Received event from partition {}".format(
partition_context.partition_id)) # Since no partition is defined so partition = 0 by default.
line_count = 0
start_time = time.time()
cnt = 0
if (len(events) == 0):
client.close() # closing the client if there is no event triggered.
else:
for event in events:
list_ = event.body_as_json()
cnt += 1
# Update checkpoint
partition_context.update_checkpoint()
print("Number of events received: ",cnt)
line_count = line_count+ cnt
end_time = time.time()
run_time = end_time - start_time
print("\nTotal Received {} records in {} seconds.".format(line_count, run_time))
try:
with client:
client.receive_batch(
on_event_batch=on_event_batch,) # With specified partition_id, load-balance will be disabled
except KeyboardInterrupt:
print('Stopped receiving.')
get_messages()
现在创建尽可能多的代码副本并将它们保存为 consumer_1.py 等等。此外,确保分区数与消费者数相等以获得最佳效率。