如何在 Azure 事件中心创建多个接收者以避免重复?

How to create multiple receivers in Azure Eventhub to avoide duplicates?

我有一个生产者正在将事件发送到事件中心。 我想创建 2 个接收器来接收来自 eventhub 的事件。如何实现。

接收者代码:

from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.eventhub import EventHubSharedKeyCredential, EventData, EventHubConsumerClient
from azure.core.exceptions import ResourceExistsError
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore

#Eventhub access credentials
connection_str = ****
consumer_group = '$Default'
eventhub_name = ****

#Blobstorage Storage credentials
storage_connection_str = ****
container_name = ****
storageAccount = ****

#For checkpointing in Blob storage
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)

#Initiate BlobServiceClient to access the Blob storage
blob_service_client = BlobServiceClient.from_connection_string(storage_connection_str)
container_client = blob_service_client.get_container_client('container_name') #Dump final data to the Blob storage in append mode.

try:
  container_client.create_container()  #Create new Container in the service
  properties = container_client.get_container_properties()
except ResourceExistsError:
  print("Container already exists.")

#Instantiate a new BlobClient
#blob_client = container_client.get_blob_client("data.csv")


def get_messages():
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

def on_event_batch(partition_context, events):
    #log.info("Partition {}, Received count: {}".format(partition_context.partition_id, len(events)))
    print("Received event from partition {}".format(
        partition_context.partition_id))  # Since no partition is defined so partition = 0 by default.
    if (len(events) == 0):
        client.close()  # closing the client if there is no event triggered.

    else:
        for event in events:
            list_ = event.body_as_json()
            # Update checkpoint
            partition_context.update_checkpoint()
try:
    with client:
        client.receive_batch(
            on_event_batch=on_event_batch,
            PARTITION="0",)
            #starting_position="-1", )  # "-1" is from the beginning of the partition.

except KeyboardInterrupt:
    print('Stopped receiving.')

get_messages()

我创建了此代码的 2 个副本,名称分别为 consumer1.pyconsumer2.py。但是这两个消费者每次都会收到相同的事件。

例如,我发送了 100 个事件,然后我希望这两个消费者并行 运行 并将这 100 个事件分开,避免重复。如何实现?

所以最后我找到了在同一个消费者组下创建多个消费者的解决方案,这些消费者可以并行消费事件并且应该在彼此之间分担负载。

from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.eventhub import EventHubSharedKeyCredential, EventData, EventHubConsumerClient
from azure.core.exceptions import ResourceExistsError
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore

#Eventhub access credentials
connection_str = ****
consumer_group = '$Default'
eventhub_name = ****

#Blobstorage Storage credentials
storage_connection_str = ****
container_name = ****
storageAccount = ****


#For checkpointing in Blob storage
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)

#Initiate BlobServiceClient to access the Blob storage
blob_service_client = BlobServiceClient.from_connection_string(storage_connection_str)
container_client = blob_service_client.get_container_client('nsc-container') 
#Dump final data to the Blob storage in append mode.

try:
  container_client.create_container()  #Create new Container in the service
  properties = container_client.get_container_properties()
except ResourceExistsError:
  print("Container already exists.")

#Instantiate a new BlobClient
#blob_client = container_client.get_blob_client("data.csv")

def get_messages():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name, checkpoint_store=checkpoint_store,)

    def on_event_batch(partition_context, events):
        #log.info("Partition {}, Received count: {}".format(partition_context.partition_id, len(events)))
        print("Received event from partition {}".format(
        partition_context.partition_id))  # Since no partition is defined so partition = 0 by default.
    line_count = 0
    start_time = time.time()
    cnt = 0
    if (len(events) == 0):
        client.close()  # closing the client if there is no event triggered.

    else:

        for event in events:
            list_ = event.body_as_json()
            cnt += 1
            # Update checkpoint
            partition_context.update_checkpoint()
        print("Number of events received: ",cnt)
    line_count = line_count+ cnt
    end_time = time.time()
    run_time = end_time - start_time
    print("\nTotal Received {} records in {} seconds.".format(line_count, run_time))

try:

    with client:
        client.receive_batch(
            on_event_batch=on_event_batch,) # With specified partition_id, load-balance will be disabled

except KeyboardInterrupt:
    print('Stopped receiving.')

get_messages()

现在创建尽可能多的代码副本并将它们保存为 consumer_1.py 等等。此外,确保分区数与消费者数相等以获得最佳效率。