MassTransit - 限制在多消费者设置中同时处理哪些消息

MassTransit - Limit which messages get processed concurrently in multi-consumer setup

目前,我有一个 windows 服务使用 RabbitMQ 来处理来自 Web 应用程序的异步消息。此服务使用消息上的实体 ID 来查找需要处理的实体。该消息目前并未指示应对该实体执行的操作,因为这已由内部操作日志提供。为了帮助转移到多个竞争的消费者设置,我正在考虑通过 RabbitMQ 实现 MassTransit。

我正在研究如何防止不同消费者同时处理具有相同实体 ID 的多条消息。 MassTransit 中是否有任何内置功能可以让我处理这种情况或 suggestion/resource 如何处理?

我查看了最新的过滤器和绿色管道分区过滤器。要么这些不符合我的要求,要么我在我的测试解决方案中配置不正确。我还考虑过锁定实体,以便第二次以上并发处理实体的尝试将等到第一次完成,但我真的不想让一个或多个消费者等待,如果我没有的话。

分区程序应该完全满足您的需求,前提是您指定了密钥提供程序,以便它 returns 相同实体的相同标识符。

这个单元测试展示了分区程序是如何设置的: https://github.com/MassTransit/MassTransit/blob/master/src/MassTransit.Tests/Pipeline/PartitionByKey_Specs.cs

关键是:

configurator.Consumer(() => new PartitionedConsumer(_completed), x =>
{
    x.Message<PartitionedMessage>(m =>
    {
        m.UsePartitioner(8, context => context.Message.CorrelationId);
    });
});