Dotnet 和 Azure 事件中心:按位置检索事件

Dotnet and Azure Event Hubs: Retrieve an event by its position

我一直在尝试使用 Microsoft.Azure.EventHubs 按位置检索事件。

有人告诉我有一种方法可以使用偏移量或序列号来计算事件位置,所以每次我将事件添加到 EventBatch 时,我都会在 Redis 中缓存一个 ID 以及事件的偏移量和序列号.

然后,每当我想检索特定事件时,我都会在 Redis 上搜索它的 ID,检索它的偏移量和序列号,并可能在事件中心流中检索它。

问题是偏移量和序列号是负长,我无法理解如何将其用作索引。

你们知道怎么做到的吗?

这是我的发布者和检索者类

    public class EventHubPublisher
    {
        public static async Task SendMessage(string _connectionString, string _eventHubName, string _message, string onboardingid)
        {
            var producerClient = new EventHubProducerClient(_connectionString, _eventHubName);
            var eventBatch = await producerClient.CreateBatchAsync();
            var data = new EventData(Encoding.UTF8.GetBytes(_message));
            eventBatch.TryAdd(data);
            addDatatoRedis(data.SequenceNumber,data.Offset,onboardingid);
            await producerClient.SendAsync(eventBatch);
        }
        private static void addDatatoRedis(long sequenceNumber, long offset, string onboardingid)
        {
            try
            {
                var redisConnection = ConnectionMultiplexer.Connect(ConnectionStringsSettings.Properties.Redis);
                var redis = redisConnection.GetDatabase();
                var value = new
                {
                    sequence_number = sequenceNumber.ToString(),
                    offset = offset.ToString(),
                };
                redis.StringSetAsync(onboardingid, JsonSerializer.Serialize(value));
            }
            catch (Exception)
            {
                throw;
            }
        }
    }
public class EventHubRetriever
    {
        public static async Task GetEvent(string _connectionString, string _eventHubName, JObject existentEvent)
        {
            try
            {
                var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
                var partitionRuntimeInformation = await client.GetPartitionRuntimeInformationAsync("0");
                var eventHubRunTimeInformation = await client.GetRuntimeInformationAsync();
                var eventPosition = EventPosition.FromOffset(existentEvent["offset"].ToString());
                var lastEnqueuedOffset = Convert.ToInt32(partitionRuntimeInformation.LastEnqueuedOffset);

                var offset = existentEvent["offset"];

                // var offsetPosition = lastEnqueuedOffset + offset;

                // var receiver = EventHubClient.Create();
            }
            catch (System.Exception)
            {

                throw;
            }
        }
    }

OffsetSequenceNumber 是经纪人拥有的字段,在您创建 EventData 实例时不会填充这些字段。只有在事件被接受并分配给分区后,事件中心服务才会分配偏移量和序列号。发布时无法预测这些值是什么;这些成员仅在事件被消费时有效。

您存储在 Redis 中的值是 long.MinValue,对应于未初始化的数据。 (这在 docsremarks 部分有详细说明)

不幸的是,您要实现的随机访问方案不适合事件中心。最接近我的做法是在创建事件时为应用程序 property 分配一个唯一标识符,然后将其及其大致发布时间(带有一个缓冲区以解决时间漂移)存储在 Redis 中。当您想要检索该事件时,使用发布时间作为起点并向前阅读直到找到该事件。

例如:

(免责声明:我靠记忆工作,无法 compile/test。请原谅任何语法错误)

    public class EventHubPublisher : IAsyncDisposable
    {
        // NOTE: You want the client instance to be static; by creating
        //       each time that you wish to send, you're paying the cost
        //       of establishing a connection each time.
        //
        private static readonly EventHubProducerClient ProducerClient = 
            new EventHubProducerClient(
                _connectionString, 
                _eventHubName);

        // NOTE: You should query the Event Hub for partition identifiers; it is 
        //       not safe to assume "0" as the first partition.  This is used here
        //       for illustration only.
        //
        private static readonly SendEventOptions FirstPartitionOptions =
            new SendEventOptions { PartitionId = "0" };

        // NOTE: There is no benefit to using the batch overload, as you
        //        are publishing only a single event at a time.  It's worth 
        //        mentioning that this isn't an efficient pattern for publishing;
        //        if you can batch events, you'll see better throughput.
        //
        public async Task SendMessage()
        {            
            var data = new EventData(Encoding.UTF8.GetBytes(_message));            
            data.Properties.Add("Id", Guid.NewGuid.ToString());

            await producerClient.SendAsync(new[] { data }, FirstPartitionOptions);

            // Allow for up to 5 minutes of clock skew.  This may need to be tuned
            // depending on your environment.
            
            var publishTime = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromMinutes(5));
            addDatatoRedis(publishTime, data.Properties["Id"]);
        }

        private void addDatatoRedis(DateTimeOffset enqueueTime, string id)
        {
            // ... Save to Redis
        }

        public virtual async ValueTask DisposeAsync()
        {
            await ProducerClient.CloseAsync().ConfigureAwait(false);
            GC.SuppressFinalize(this);
        }
    }

我正在使用 Azure.Messaging.EventHubs 进行说明,这是当前一代的事件中心客户端库。如果您需要像您的问题那样继续使用旧版本,那么概念将是相同的。

    public class EventHubRetriever : IAsyncDisposable
    {
        // NOTE: You want the client instance to be static; by creating
        //       each time that you wish to send, you're paying the cost
        //       of establishing a connection each time.
        //
        private static readonly EventHubConsumerClient ConsumerClient = 
            new EventHubProducerClient(
                EventHubConsumerClient.DefaultConsumerGroupName
                _connectionString, 
                _eventHubName);

        private static readonly ReadEventOptions ReadOptions = 
            new ReadEventOptions { MaximumWaitTime = TimeSpan.FromSeconds(1) };

        public async Task GetEvent(JObject persistedEvent)
        {
            var firstPartition = (await consumer.GetPartitionIdsAsync()).First();
            var enqueueTime = DateTimeOffset.Parse(persistedEvent["publishTime"]);
            var eventId = persistedEvent["id"];

            await foreach (PartitionEvent partitionEvent in 
                consumer.ReadEventsFromPartitionAsync(
                    firstPartition,
                    EventPosition.FromEnqueuedTime(publishTime),
                    ReadOptions))
            {
               if (partitionEvent.Data == null)
               {
                    // We're at the end of the event stream and didn't find
                    // your event.
               }
               else if (partitionEvent.Data.Properties["Id"] == eventId)
               {
                    // This is your event.  Do what you want and break
                    // the loop.
               }                   
            }            
        }

        public virtual async ValueTask DisposeAsync()
        {
            await ConsumerClient.CloseAsync().ConfigureAwait(false);
            GC.SuppressFinalize(this);
        }
    }

嗯,我不确定我的做法是否正确,但它似乎奏效了。 当我要发布消息时,我从 GetPartitionRunTime 获取 lastEnqueuedNumber,加 1 并将其作为 属性 添加到 eventData。 由于我还将事件添加到 redis 缓存中,因此我能够检索它的 sequenceNumber。

    public class EventHubPublisher
    {
        public static async Task SendMessage(string _connectionString, string _eventHubName, string _message, string onboardingid, string partition)
        {
            var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
            var partitionRuntimeInformation = await client.GetPartitionRuntimeInformationAsync(partition);
            var sequenceNumber = partitionRuntimeInformation.LastEnqueuedSequenceNumber + 1;

            var data = new EventData(Encoding.UTF8.GetBytes(_message));

            data.Properties.Add("Id", onboardingid);
            data.Properties.Add("Message", _message);
            data.Properties.Add("SequenceNumber", sequenceNumber);
            await client.SendAsync(data);
            addDatatoRedis(onboardingid, sequenceNumber, data.Body.Offset, _eventHubName);
        }
        private static void addDatatoRedis(string onboardingid, long sequenceNumber, int offset, string topic)
        {
            try
            {
                var redisConnection = ConnectionMultiplexer.Connect(ConnectionStringsSettings.Properties.Redis);
                var redis = redisConnection.GetDatabase();
                var value = new
                {
                    offset = offset,
                    id = onboardingid,
                    sequenceNumber = sequenceNumber,
                    topic = topic
                };
                redis.StringSetAsync(onboardingid, JsonSerializer.Serialize(value));
            }
            catch (Exception)
            {
                throw;
            }
        }

然后,当从 EventHub 检索事件时,我能够从缓存的事件中获取 sequenceNumber,并通过其索引在 eventhub 上获取事件。

    public class EventHubRetriever
    {
        public static async Task<EventData> GetEvent(string _connectionString, string _eventHubName, JObject existentEvent)
        {
            try
            {
                var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
                var eventHubRunTimeInformation = await client.GetRuntimeInformationAsync();
                var partitionsIds = eventHubRunTimeInformation.PartitionIds;

                var sequenceNumber = (long)existentEvent["sequenceNumber"];

                var retrievedEvent = GetEventByPosition(client, sequenceNumber, partitionsIds);

                return retrievedEvent;

            }
            catch (Exception exc)
            {
                throw new EventRetrieveException(exc, "An error ocurred while retrieving data from Event Hub.");
            }
        }

        private static EventData GetEventByPosition(EventHubClient client, long sequenceNumber, string[] partitionsIds, string eventHubName)
        {
            var eventPosition = EventPosition.FromSequenceNumber(sequenceNumber, true);
            var partitionReceivers = partitionsIds
                                        .Select(id => client.CreateReceiver(eventHubName, id, eventPosition));
            var events = partitionReceivers.Select(receiver => receiver.ReceiveAsync(1)).SelectMany(x => x.Result).ToList();

            return events.ElementAt(0);
        }
    }