Dotnet 和 Azure 事件中心:按位置检索事件
Dotnet and Azure Event Hubs: Retrieve an event by its position
我一直在尝试使用 Microsoft.Azure.EventHubs 按位置检索事件。
有人告诉我有一种方法可以使用偏移量或序列号来计算事件位置,所以每次我将事件添加到 EventBatch 时,我都会在 Redis 中缓存一个 ID 以及事件的偏移量和序列号.
然后,每当我想检索特定事件时,我都会在 Redis 上搜索它的 ID,检索它的偏移量和序列号,并可能在事件中心流中检索它。
问题是偏移量和序列号是负长,我无法理解如何将其用作索引。
你们知道怎么做到的吗?
这是我的发布者和检索者类
public class EventHubPublisher
{
public static async Task SendMessage(string _connectionString, string _eventHubName, string _message, string onboardingid)
{
var producerClient = new EventHubProducerClient(_connectionString, _eventHubName);
var eventBatch = await producerClient.CreateBatchAsync();
var data = new EventData(Encoding.UTF8.GetBytes(_message));
eventBatch.TryAdd(data);
addDatatoRedis(data.SequenceNumber,data.Offset,onboardingid);
await producerClient.SendAsync(eventBatch);
}
private static void addDatatoRedis(long sequenceNumber, long offset, string onboardingid)
{
try
{
var redisConnection = ConnectionMultiplexer.Connect(ConnectionStringsSettings.Properties.Redis);
var redis = redisConnection.GetDatabase();
var value = new
{
sequence_number = sequenceNumber.ToString(),
offset = offset.ToString(),
};
redis.StringSetAsync(onboardingid, JsonSerializer.Serialize(value));
}
catch (Exception)
{
throw;
}
}
}
public class EventHubRetriever
{
public static async Task GetEvent(string _connectionString, string _eventHubName, JObject existentEvent)
{
try
{
var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
var partitionRuntimeInformation = await client.GetPartitionRuntimeInformationAsync("0");
var eventHubRunTimeInformation = await client.GetRuntimeInformationAsync();
var eventPosition = EventPosition.FromOffset(existentEvent["offset"].ToString());
var lastEnqueuedOffset = Convert.ToInt32(partitionRuntimeInformation.LastEnqueuedOffset);
var offset = existentEvent["offset"];
// var offsetPosition = lastEnqueuedOffset + offset;
// var receiver = EventHubClient.Create();
}
catch (System.Exception)
{
throw;
}
}
}
Offset
和 SequenceNumber
是经纪人拥有的字段,在您创建 EventData
实例时不会填充这些字段。只有在事件被接受并分配给分区后,事件中心服务才会分配偏移量和序列号。发布时无法预测这些值是什么;这些成员仅在事件被消费时有效。
您存储在 Redis 中的值是 long.MinValue
,对应于未初始化的数据。 (这在 docs 的 remarks
部分有详细说明)
不幸的是,您要实现的随机访问方案不适合事件中心。最接近我的做法是在创建事件时为应用程序 property 分配一个唯一标识符,然后将其及其大致发布时间(带有一个缓冲区以解决时间漂移)存储在 Redis 中。当您想要检索该事件时,使用发布时间作为起点并向前阅读直到找到该事件。
例如:
(免责声明:我靠记忆工作,无法 compile/test。请原谅任何语法错误)
public class EventHubPublisher : IAsyncDisposable
{
// NOTE: You want the client instance to be static; by creating
// each time that you wish to send, you're paying the cost
// of establishing a connection each time.
//
private static readonly EventHubProducerClient ProducerClient =
new EventHubProducerClient(
_connectionString,
_eventHubName);
// NOTE: You should query the Event Hub for partition identifiers; it is
// not safe to assume "0" as the first partition. This is used here
// for illustration only.
//
private static readonly SendEventOptions FirstPartitionOptions =
new SendEventOptions { PartitionId = "0" };
// NOTE: There is no benefit to using the batch overload, as you
// are publishing only a single event at a time. It's worth
// mentioning that this isn't an efficient pattern for publishing;
// if you can batch events, you'll see better throughput.
//
public async Task SendMessage()
{
var data = new EventData(Encoding.UTF8.GetBytes(_message));
data.Properties.Add("Id", Guid.NewGuid.ToString());
await producerClient.SendAsync(new[] { data }, FirstPartitionOptions);
// Allow for up to 5 minutes of clock skew. This may need to be tuned
// depending on your environment.
var publishTime = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromMinutes(5));
addDatatoRedis(publishTime, data.Properties["Id"]);
}
private void addDatatoRedis(DateTimeOffset enqueueTime, string id)
{
// ... Save to Redis
}
public virtual async ValueTask DisposeAsync()
{
await ProducerClient.CloseAsync().ConfigureAwait(false);
GC.SuppressFinalize(this);
}
}
我正在使用 Azure.Messaging.EventHubs
进行说明,这是当前一代的事件中心客户端库。如果您需要像您的问题那样继续使用旧版本,那么概念将是相同的。
public class EventHubRetriever : IAsyncDisposable
{
// NOTE: You want the client instance to be static; by creating
// each time that you wish to send, you're paying the cost
// of establishing a connection each time.
//
private static readonly EventHubConsumerClient ConsumerClient =
new EventHubProducerClient(
EventHubConsumerClient.DefaultConsumerGroupName
_connectionString,
_eventHubName);
private static readonly ReadEventOptions ReadOptions =
new ReadEventOptions { MaximumWaitTime = TimeSpan.FromSeconds(1) };
public async Task GetEvent(JObject persistedEvent)
{
var firstPartition = (await consumer.GetPartitionIdsAsync()).First();
var enqueueTime = DateTimeOffset.Parse(persistedEvent["publishTime"]);
var eventId = persistedEvent["id"];
await foreach (PartitionEvent partitionEvent in
consumer.ReadEventsFromPartitionAsync(
firstPartition,
EventPosition.FromEnqueuedTime(publishTime),
ReadOptions))
{
if (partitionEvent.Data == null)
{
// We're at the end of the event stream and didn't find
// your event.
}
else if (partitionEvent.Data.Properties["Id"] == eventId)
{
// This is your event. Do what you want and break
// the loop.
}
}
}
public virtual async ValueTask DisposeAsync()
{
await ConsumerClient.CloseAsync().ConfigureAwait(false);
GC.SuppressFinalize(this);
}
}
嗯,我不确定我的做法是否正确,但它似乎奏效了。
当我要发布消息时,我从 GetPartitionRunTime 获取 lastEnqueuedNumber,加 1 并将其作为 属性 添加到 eventData。
由于我还将事件添加到 redis 缓存中,因此我能够检索它的 sequenceNumber。
public class EventHubPublisher
{
public static async Task SendMessage(string _connectionString, string _eventHubName, string _message, string onboardingid, string partition)
{
var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
var partitionRuntimeInformation = await client.GetPartitionRuntimeInformationAsync(partition);
var sequenceNumber = partitionRuntimeInformation.LastEnqueuedSequenceNumber + 1;
var data = new EventData(Encoding.UTF8.GetBytes(_message));
data.Properties.Add("Id", onboardingid);
data.Properties.Add("Message", _message);
data.Properties.Add("SequenceNumber", sequenceNumber);
await client.SendAsync(data);
addDatatoRedis(onboardingid, sequenceNumber, data.Body.Offset, _eventHubName);
}
private static void addDatatoRedis(string onboardingid, long sequenceNumber, int offset, string topic)
{
try
{
var redisConnection = ConnectionMultiplexer.Connect(ConnectionStringsSettings.Properties.Redis);
var redis = redisConnection.GetDatabase();
var value = new
{
offset = offset,
id = onboardingid,
sequenceNumber = sequenceNumber,
topic = topic
};
redis.StringSetAsync(onboardingid, JsonSerializer.Serialize(value));
}
catch (Exception)
{
throw;
}
}
然后,当从 EventHub 检索事件时,我能够从缓存的事件中获取 sequenceNumber,并通过其索引在 eventhub 上获取事件。
public class EventHubRetriever
{
public static async Task<EventData> GetEvent(string _connectionString, string _eventHubName, JObject existentEvent)
{
try
{
var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
var eventHubRunTimeInformation = await client.GetRuntimeInformationAsync();
var partitionsIds = eventHubRunTimeInformation.PartitionIds;
var sequenceNumber = (long)existentEvent["sequenceNumber"];
var retrievedEvent = GetEventByPosition(client, sequenceNumber, partitionsIds);
return retrievedEvent;
}
catch (Exception exc)
{
throw new EventRetrieveException(exc, "An error ocurred while retrieving data from Event Hub.");
}
}
private static EventData GetEventByPosition(EventHubClient client, long sequenceNumber, string[] partitionsIds, string eventHubName)
{
var eventPosition = EventPosition.FromSequenceNumber(sequenceNumber, true);
var partitionReceivers = partitionsIds
.Select(id => client.CreateReceiver(eventHubName, id, eventPosition));
var events = partitionReceivers.Select(receiver => receiver.ReceiveAsync(1)).SelectMany(x => x.Result).ToList();
return events.ElementAt(0);
}
}
我一直在尝试使用 Microsoft.Azure.EventHubs 按位置检索事件。
有人告诉我有一种方法可以使用偏移量或序列号来计算事件位置,所以每次我将事件添加到 EventBatch 时,我都会在 Redis 中缓存一个 ID 以及事件的偏移量和序列号.
然后,每当我想检索特定事件时,我都会在 Redis 上搜索它的 ID,检索它的偏移量和序列号,并可能在事件中心流中检索它。
问题是偏移量和序列号是负长,我无法理解如何将其用作索引。
你们知道怎么做到的吗?
这是我的发布者和检索者类
public class EventHubPublisher
{
public static async Task SendMessage(string _connectionString, string _eventHubName, string _message, string onboardingid)
{
var producerClient = new EventHubProducerClient(_connectionString, _eventHubName);
var eventBatch = await producerClient.CreateBatchAsync();
var data = new EventData(Encoding.UTF8.GetBytes(_message));
eventBatch.TryAdd(data);
addDatatoRedis(data.SequenceNumber,data.Offset,onboardingid);
await producerClient.SendAsync(eventBatch);
}
private static void addDatatoRedis(long sequenceNumber, long offset, string onboardingid)
{
try
{
var redisConnection = ConnectionMultiplexer.Connect(ConnectionStringsSettings.Properties.Redis);
var redis = redisConnection.GetDatabase();
var value = new
{
sequence_number = sequenceNumber.ToString(),
offset = offset.ToString(),
};
redis.StringSetAsync(onboardingid, JsonSerializer.Serialize(value));
}
catch (Exception)
{
throw;
}
}
}
public class EventHubRetriever
{
public static async Task GetEvent(string _connectionString, string _eventHubName, JObject existentEvent)
{
try
{
var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
var partitionRuntimeInformation = await client.GetPartitionRuntimeInformationAsync("0");
var eventHubRunTimeInformation = await client.GetRuntimeInformationAsync();
var eventPosition = EventPosition.FromOffset(existentEvent["offset"].ToString());
var lastEnqueuedOffset = Convert.ToInt32(partitionRuntimeInformation.LastEnqueuedOffset);
var offset = existentEvent["offset"];
// var offsetPosition = lastEnqueuedOffset + offset;
// var receiver = EventHubClient.Create();
}
catch (System.Exception)
{
throw;
}
}
}
Offset
和 SequenceNumber
是经纪人拥有的字段,在您创建 EventData
实例时不会填充这些字段。只有在事件被接受并分配给分区后,事件中心服务才会分配偏移量和序列号。发布时无法预测这些值是什么;这些成员仅在事件被消费时有效。
您存储在 Redis 中的值是 long.MinValue
,对应于未初始化的数据。 (这在 docs 的 remarks
部分有详细说明)
不幸的是,您要实现的随机访问方案不适合事件中心。最接近我的做法是在创建事件时为应用程序 property 分配一个唯一标识符,然后将其及其大致发布时间(带有一个缓冲区以解决时间漂移)存储在 Redis 中。当您想要检索该事件时,使用发布时间作为起点并向前阅读直到找到该事件。
例如:
(免责声明:我靠记忆工作,无法 compile/test。请原谅任何语法错误)
public class EventHubPublisher : IAsyncDisposable
{
// NOTE: You want the client instance to be static; by creating
// each time that you wish to send, you're paying the cost
// of establishing a connection each time.
//
private static readonly EventHubProducerClient ProducerClient =
new EventHubProducerClient(
_connectionString,
_eventHubName);
// NOTE: You should query the Event Hub for partition identifiers; it is
// not safe to assume "0" as the first partition. This is used here
// for illustration only.
//
private static readonly SendEventOptions FirstPartitionOptions =
new SendEventOptions { PartitionId = "0" };
// NOTE: There is no benefit to using the batch overload, as you
// are publishing only a single event at a time. It's worth
// mentioning that this isn't an efficient pattern for publishing;
// if you can batch events, you'll see better throughput.
//
public async Task SendMessage()
{
var data = new EventData(Encoding.UTF8.GetBytes(_message));
data.Properties.Add("Id", Guid.NewGuid.ToString());
await producerClient.SendAsync(new[] { data }, FirstPartitionOptions);
// Allow for up to 5 minutes of clock skew. This may need to be tuned
// depending on your environment.
var publishTime = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromMinutes(5));
addDatatoRedis(publishTime, data.Properties["Id"]);
}
private void addDatatoRedis(DateTimeOffset enqueueTime, string id)
{
// ... Save to Redis
}
public virtual async ValueTask DisposeAsync()
{
await ProducerClient.CloseAsync().ConfigureAwait(false);
GC.SuppressFinalize(this);
}
}
我正在使用 Azure.Messaging.EventHubs
进行说明,这是当前一代的事件中心客户端库。如果您需要像您的问题那样继续使用旧版本,那么概念将是相同的。
public class EventHubRetriever : IAsyncDisposable
{
// NOTE: You want the client instance to be static; by creating
// each time that you wish to send, you're paying the cost
// of establishing a connection each time.
//
private static readonly EventHubConsumerClient ConsumerClient =
new EventHubProducerClient(
EventHubConsumerClient.DefaultConsumerGroupName
_connectionString,
_eventHubName);
private static readonly ReadEventOptions ReadOptions =
new ReadEventOptions { MaximumWaitTime = TimeSpan.FromSeconds(1) };
public async Task GetEvent(JObject persistedEvent)
{
var firstPartition = (await consumer.GetPartitionIdsAsync()).First();
var enqueueTime = DateTimeOffset.Parse(persistedEvent["publishTime"]);
var eventId = persistedEvent["id"];
await foreach (PartitionEvent partitionEvent in
consumer.ReadEventsFromPartitionAsync(
firstPartition,
EventPosition.FromEnqueuedTime(publishTime),
ReadOptions))
{
if (partitionEvent.Data == null)
{
// We're at the end of the event stream and didn't find
// your event.
}
else if (partitionEvent.Data.Properties["Id"] == eventId)
{
// This is your event. Do what you want and break
// the loop.
}
}
}
public virtual async ValueTask DisposeAsync()
{
await ConsumerClient.CloseAsync().ConfigureAwait(false);
GC.SuppressFinalize(this);
}
}
嗯,我不确定我的做法是否正确,但它似乎奏效了。 当我要发布消息时,我从 GetPartitionRunTime 获取 lastEnqueuedNumber,加 1 并将其作为 属性 添加到 eventData。 由于我还将事件添加到 redis 缓存中,因此我能够检索它的 sequenceNumber。
public class EventHubPublisher
{
public static async Task SendMessage(string _connectionString, string _eventHubName, string _message, string onboardingid, string partition)
{
var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
var partitionRuntimeInformation = await client.GetPartitionRuntimeInformationAsync(partition);
var sequenceNumber = partitionRuntimeInformation.LastEnqueuedSequenceNumber + 1;
var data = new EventData(Encoding.UTF8.GetBytes(_message));
data.Properties.Add("Id", onboardingid);
data.Properties.Add("Message", _message);
data.Properties.Add("SequenceNumber", sequenceNumber);
await client.SendAsync(data);
addDatatoRedis(onboardingid, sequenceNumber, data.Body.Offset, _eventHubName);
}
private static void addDatatoRedis(string onboardingid, long sequenceNumber, int offset, string topic)
{
try
{
var redisConnection = ConnectionMultiplexer.Connect(ConnectionStringsSettings.Properties.Redis);
var redis = redisConnection.GetDatabase();
var value = new
{
offset = offset,
id = onboardingid,
sequenceNumber = sequenceNumber,
topic = topic
};
redis.StringSetAsync(onboardingid, JsonSerializer.Serialize(value));
}
catch (Exception)
{
throw;
}
}
然后,当从 EventHub 检索事件时,我能够从缓存的事件中获取 sequenceNumber,并通过其索引在 eventhub 上获取事件。
public class EventHubRetriever
{
public static async Task<EventData> GetEvent(string _connectionString, string _eventHubName, JObject existentEvent)
{
try
{
var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
var eventHubRunTimeInformation = await client.GetRuntimeInformationAsync();
var partitionsIds = eventHubRunTimeInformation.PartitionIds;
var sequenceNumber = (long)existentEvent["sequenceNumber"];
var retrievedEvent = GetEventByPosition(client, sequenceNumber, partitionsIds);
return retrievedEvent;
}
catch (Exception exc)
{
throw new EventRetrieveException(exc, "An error ocurred while retrieving data from Event Hub.");
}
}
private static EventData GetEventByPosition(EventHubClient client, long sequenceNumber, string[] partitionsIds, string eventHubName)
{
var eventPosition = EventPosition.FromSequenceNumber(sequenceNumber, true);
var partitionReceivers = partitionsIds
.Select(id => client.CreateReceiver(eventHubName, id, eventPosition));
var events = partitionReceivers.Select(receiver => receiver.ReceiveAsync(1)).SelectMany(x => x.Result).ToList();
return events.ElementAt(0);
}
}