使用 RabbitMQ Publish/Subscribe 微服务事件总线时如何去重事件

How to deduplicate events when using RabbitMQ Publish/Subscribe Microservice Event Bus

我已经阅读了第 58 页的 This Book 以了解如何在微服务之间进行异步事件集成。

使用 RabbitMQ 和 publish/subscribe 模式有助于将事件推送给订阅者。但是,考虑到微服务架构和 docker 用法,我希望微服务实例不止一次 'type' 运行。据我了解,所有实例都会订阅该事件,因此都会收到它。

这本书没有清楚地解释如何确保只有一个实例处理请求。

我查看了重复部分,但它描述了一种模式,该模式解释了如何在服务实例中进行重复数据删除,但不一定针对它们...

每个微服务实例都将使用类似于以下内容的订阅:

public void Subscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>
    {
        var eventName = _subsManager.GetEventKey<T>();

        var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
        if (!containsKey)
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }

            using (var channel = _persistentConnection.CreateModel())
            {
                channel.QueueBind(queue: _queueName,
                                    exchange: BROKER_NAME,
                                    routingKey: eventName);
            }
        }

        _subsManager.AddSubscription<T, TH>();
    }

我需要了解同一 'type' 微服务的多个微服务实例如何在处理过程中服务出现故障时删除重复数据而不丢失消息。

From what I understand all instances will subscribe to the event and therefore would all receive it.

只有一个订阅者实例会处理 message/event。当您有多个服务实例 运行 并订阅了同一个订阅时,第一个选择消息的实例会将消息设置为从订阅中不可见(称为可见性超时)。如果服务实例能够在给定时间内处理该消息,它将通知队列删除该消息,如果它不能及时处理该消息,该消息将重新出现在队列中以供任何实例再次拾取它.

所有标准服务总线(rabbitMQ、SQS、Azure Serivce 总线等)都提供开箱即用的此功能。

顺便说一下,我已经阅读了这本书并使用了来自 eShotContainers 的上述代码,它按照我描述的方式工作。

你也应该看看下面的模式 Competing Consumers pattern

希望对您有所帮助!