消费者可以处理的 MassTransit 过滤消息

MassTransit Filtering messages a consumer can handle

我有一个像这样的通用消息界面:

public interface IMyMessage
{
     int EventCode {get;}
}

现在我有多个消费者处理这条消息:

public class MyConsumer1: IConsumer<IMyMessage>{...}
public class MyConsumer2: IConsumer<IMyMessage>{...}

我希望 MyConsumer1 只处理那些 EventCode==1 的消息,让 MyConsumer2 处理 EventCode==2 的所有消息。

我知道我可以在 Consume 方法中做一个 if 语句,但想知道是否有更好的方法,比如一些路由过滤器?

我的首选方法是创建一个属性,即。 HandlesEventCodeAttribute(1) 并将其应用于消费者。

我还使用 Autofac 容器与 MassTransit 集成。

请帮忙

谢谢

Before I give any input on the actual question, I would ask why you'd use the same message type with a property to determine which consumers actually consume the message. There are better (more efficient) methods available, such as using DIRECT exchanges with RabbitMQ.

您可以创建自己的属性,并创建一个 middleware filter 来查看消费者,查看它是否具有自定义属性,然后使用该属性的值来检查消息并对其进行过滤如果消费者不感兴趣

完整的工作示例如下所示:

首先,创建属性。

class EventCodeAttribute :
    Attribute
{
    public int EventCode { get; }

    public EventCodeAttribute(int eventCode)
    {
        EventCode = eventCode;
    }
}

消息类型:

interface IEventMessage
{
    int EventCode { get; }
}

中间件过滤器:

class EventCodeFilter<TConsumer> :
    IFilter<ConsumerConsumeContext<TConsumer, IEventMessage>>
    where TConsumer : class
{
    readonly int _eventCode;

    public EventCodeFilter()
    {
        var attribute = typeof(TConsumer).GetCustomAttribute<EventCodeAttribute>();
        if (attribute == null)
            throw new ArgumentException("Message does not have the attribute required");

        _eventCode = attribute.EventCode;
    }

    public async Task Send(ConsumerConsumeContext<TConsumer, IEventMessage> context, IPipe<ConsumerConsumeContext<TConsumer, IEventMessage>> next)
    {
        if (context.Message.EventCode.Equals(_eventCode))
        {
            await next.Send(context);
        }
    }

    public void Probe(ProbeContext context)
    {
        var scope = context.CreateFilterScope("eventCode");
        scope.Add("code", _eventCode);
    }
}

样本消费者:

[EventCode(27)]
class EventCodeConsumer :
    IConsumer<IEventMessage>
{
    public async Task Consume(ConsumeContext<IEventMessage> context)
    {
    }
}

最后,配置消费者使用过滤器:

builder.AddMassTransit(cfg =>
{
    cfg.AddConsumer<EventCodeConsumer>(x =>
        x.ConsumerMessage<IEventMessage>(p => p.UseFilter(new EventCodeFilter<EventCodeConsumer>())));
});