如何将 EventProcessorClient 配置为只读取特定分区键(而不是分区 ID)的事件?
How to configure EventProcessorClient to read events only for a particular partition key (not partition id)?
我有一个带有 2 个分区的事件中心,并使用以下代码使用不同的分区键向它发送事件(基于 https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/eventhub/Azure.Messaging.EventHubs 上的文档)。我正在使用 .NET 的 Azure.Messaging.EventHubs 库(使用 .net core 3.1)
await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
using EventDataBatch eventBatch = await producer.CreateBatchAsync(new CreateBatchOptions() { PartitionKey = "MyPartitionA" });
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("First")));
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Second")));
await produce.SendAsync(eventBatch);
using EventDataBatch eventBatch2 = await producer.CreateBatchAsync(new CreateBatchOptions() { PartitionKey = "MyPartitionB" });
eventBatch2.TryAdd(new EventData(Encoding.UTF8.GetBytes("Third")));
eventBatch2.TryAdd(new EventData(Encoding.UTF8.GetBytes("Fourth")));
await producer.SendAsync(eventBatch2);
}
如您所见,我使用分区键作为 MyPartitionA 发送了包含 2 个事件的第一批,使用分区键作为 MyPartitionB 发送了包含 2 个事件的第二批。有趣的是,来自两个分区键的事件都进入了同一个分区(即事件中心上的分区 0)。
在接收端,我正在尝试使用 https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/eventhub/Azure.Messaging.EventHubs.Processor#start-and-stop-processing 中的代码示例,如下所示(我正在使用 Azure.Messaging.EventHubs..NET 的处理器库。)
async Task processEventHandler(ProcessEventArgs eventArgs)
{
try
{
// Perform the application-specific processing for an event
await DoSomethingWithTheEvent(eventArgs.Partition, eventArgs.Data);
}
catch
{
// Handle the exception from handler code
}
}
async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
try
{
// Perform the application-specific processing for an error
await DoSomethingWithTheError(eventArgs.Exception);
}
catch
{
// Handle the exception from handler code
}
}
private async Task ProcessUntilCanceled(CancellationToken cancellationToken)
{
var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);
processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;
await processor.StartProcessingAsync();
try
{
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(1));
}
await processor.StopProcessingAsync();
}
finally
{
// To prevent leaks, the handlers should be removed when processing is complete
processor.ProcessEventAsync -= processEventHandler;
processor.ProcessErrorAsync -= processErrorHandler;
}
}
如何在上面的代码中找不到只接收给定分区(比如 MyPartitionA)的事件而不接收来自其他分区(比如 MyPartitionB)的事件的方法。
- 是否可以注册处理器以根据特定分区键(而不是分区 ID)接收事件?
- 如果具有分区键 MyPartitionA 和 MyPartitionB 的事件都被发送到事件中心的分区 0,是否仍然可以只接收单个分区键(例如 MyPartitionA)的事件而不接收其他没有分区键的事件相同的分区键,即使它们可能驻留在事件中心的相同分区中?
EventHubs 是一种高吞吐量持久流,它提供 stream-level 语义(与服务总线相比)
现在回答您的问题,您正在寻找基于事件的 属性 过滤事件 - 事件级操作 - 不是流级操作。
没有直接的方法可以达到您的要求。
该方法是为您自己实施自定义解决方案 - 是从 EventHubs(事件流)中拉取事件,并通过 PartitionKey 进行中间过程 t0 过滤,并将它们推送到另一个事件中心或替代机制,以便它可以相应消耗。
或者,如果您正在考虑使用服务总线(如果这符合您的要求),那么您可以参考以下内容:
The partition key property will be used to identify the partition
within the Queue where the message must be stored when the session-id
property of the message is not set.
您可以使用 AcceptMessageSession([PartitionKey])
从特定分区键或会话 ID 接收消息
参考:AcceptMessageSession(String)
var partitionlistener = qc.AcceptMessageSession("MyPartitionA");
var message = partitionlistener.Receive();
qc = Queue client
您无法使用 SDK 中的任何客户端读取基于分区键的事件。
分区键是一个综合概念,发布后不会为事件保留。当您使用分区键发布时,该键被散列并且结果值用于 select 一个分区以将事件路由到;它的目的是确保相关事件被路由到同一个分区,但不需要了解哪个分区被 selected 并且不提供任何公平分配的保证。
要完成您要执行的过滤,您可能希望将分区键存储为事件的 application property,然后将该值用作 ProcessEventAsync
中的过滤器处理程序。请注意,您将收到来自所有分区的所有事件 - 这是 EventProcessorClient
.
的主要目标
我认为我们对您的应用程序场景的上下文了解不够,无法帮助确定最佳方法,但根据我们所知道的,我建议考虑替代方案。由于您似乎需要显式读取一组事件,因此使用其 Id 而不是密钥发布到 well-known 分区可能会有所帮助。然后,您将能够使用 EventHubConsumerClient::ReadEventsFromPartitionAsync
方法专门从该分区读取事件。当然,这还需要您明确控制在应用程序中发布其他事件的位置,以确保它们被路由到您的第二个分区。
我有一个带有 2 个分区的事件中心,并使用以下代码使用不同的分区键向它发送事件(基于 https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/eventhub/Azure.Messaging.EventHubs 上的文档)。我正在使用 .NET 的 Azure.Messaging.EventHubs 库(使用 .net core 3.1)
await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
using EventDataBatch eventBatch = await producer.CreateBatchAsync(new CreateBatchOptions() { PartitionKey = "MyPartitionA" });
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("First")));
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Second")));
await produce.SendAsync(eventBatch);
using EventDataBatch eventBatch2 = await producer.CreateBatchAsync(new CreateBatchOptions() { PartitionKey = "MyPartitionB" });
eventBatch2.TryAdd(new EventData(Encoding.UTF8.GetBytes("Third")));
eventBatch2.TryAdd(new EventData(Encoding.UTF8.GetBytes("Fourth")));
await producer.SendAsync(eventBatch2);
}
如您所见,我使用分区键作为 MyPartitionA 发送了包含 2 个事件的第一批,使用分区键作为 MyPartitionB 发送了包含 2 个事件的第二批。有趣的是,来自两个分区键的事件都进入了同一个分区(即事件中心上的分区 0)。
在接收端,我正在尝试使用 https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/eventhub/Azure.Messaging.EventHubs.Processor#start-and-stop-processing 中的代码示例,如下所示(我正在使用 Azure.Messaging.EventHubs..NET 的处理器库。)
async Task processEventHandler(ProcessEventArgs eventArgs)
{
try
{
// Perform the application-specific processing for an event
await DoSomethingWithTheEvent(eventArgs.Partition, eventArgs.Data);
}
catch
{
// Handle the exception from handler code
}
}
async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
try
{
// Perform the application-specific processing for an error
await DoSomethingWithTheError(eventArgs.Exception);
}
catch
{
// Handle the exception from handler code
}
}
private async Task ProcessUntilCanceled(CancellationToken cancellationToken)
{
var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);
processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;
await processor.StartProcessingAsync();
try
{
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(1));
}
await processor.StopProcessingAsync();
}
finally
{
// To prevent leaks, the handlers should be removed when processing is complete
processor.ProcessEventAsync -= processEventHandler;
processor.ProcessErrorAsync -= processErrorHandler;
}
}
如何在上面的代码中找不到只接收给定分区(比如 MyPartitionA)的事件而不接收来自其他分区(比如 MyPartitionB)的事件的方法。
- 是否可以注册处理器以根据特定分区键(而不是分区 ID)接收事件?
- 如果具有分区键 MyPartitionA 和 MyPartitionB 的事件都被发送到事件中心的分区 0,是否仍然可以只接收单个分区键(例如 MyPartitionA)的事件而不接收其他没有分区键的事件相同的分区键,即使它们可能驻留在事件中心的相同分区中?
EventHubs 是一种高吞吐量持久流,它提供 stream-level 语义(与服务总线相比)
现在回答您的问题,您正在寻找基于事件的 属性 过滤事件 - 事件级操作 - 不是流级操作。
没有直接的方法可以达到您的要求。
该方法是为您自己实施自定义解决方案 - 是从 EventHubs(事件流)中拉取事件,并通过 PartitionKey 进行中间过程 t0 过滤,并将它们推送到另一个事件中心或替代机制,以便它可以相应消耗。
或者,如果您正在考虑使用服务总线(如果这符合您的要求),那么您可以参考以下内容:
The partition key property will be used to identify the partition within the Queue where the message must be stored when the session-id property of the message is not set.
您可以使用 AcceptMessageSession([PartitionKey])
从特定分区键或会话 ID 接收消息参考:AcceptMessageSession(String)
var partitionlistener = qc.AcceptMessageSession("MyPartitionA");
var message = partitionlistener.Receive();
qc = Queue client
您无法使用 SDK 中的任何客户端读取基于分区键的事件。
分区键是一个综合概念,发布后不会为事件保留。当您使用分区键发布时,该键被散列并且结果值用于 select 一个分区以将事件路由到;它的目的是确保相关事件被路由到同一个分区,但不需要了解哪个分区被 selected 并且不提供任何公平分配的保证。
要完成您要执行的过滤,您可能希望将分区键存储为事件的 application property,然后将该值用作 ProcessEventAsync
中的过滤器处理程序。请注意,您将收到来自所有分区的所有事件 - 这是 EventProcessorClient
.
我认为我们对您的应用程序场景的上下文了解不够,无法帮助确定最佳方法,但根据我们所知道的,我建议考虑替代方案。由于您似乎需要显式读取一组事件,因此使用其 Id 而不是密钥发布到 well-known 分区可能会有所帮助。然后,您将能够使用 EventHubConsumerClient::ReadEventsFromPartitionAsync
方法专门从该分区读取事件。当然,这还需要您明确控制在应用程序中发布其他事件的位置,以确保它们被路由到您的第二个分区。