如何独立于其他消费者配置一些 types/groups 消费者?

How to configure some types/groups of consumers independently of others?

现在我使用以下代码添加所有消费者并配置端点:

services.AddMassTransit(x =>
{
    x.AddConsumers(assemblyToLoadConsumersFrom);

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(new Uri(settings.ConnectionString));

        cfg.ConfigureEndpoints(context);
    });
});

但是现在我有不同类型的消费者定义抽象 类 像 CommandResponseConsumerBaseQueryConsumerBase (那些抽象 类 实现 IConsumer 接口,和实际的消费者实现继承自那些 类)。每个消费者实现都需要构造函数中的一些参数由 DI 注入(使用 Microsoft DI 和上面的代码正确地将参数注入消费者构造函数)。

问题是,例如,对于类型为 QueryConsumerBase 的所有消费者,我想设置 endpointConfiguration.DiscardFaultedMessages(),但为其他消费者(如 CommandResponseConsumerBase)保留默认值(而不是丢弃)。我还想保留默认拓扑/端点名称,如上面代码分配的名称。

我也不想单独配置消费者,因为我有很多消费者,这会很乏味。相反,我只想在程序集中找到所有 CommandResponseConsumerBase 使用者并注册它们并立即配置它们,然后在程序集中找到所有 QueryConsumerBase 并注册它们并立即配置它们。

代码看起来像这样:

services.AddMassTransit(x =>
{
    // x.AddConsumers(assemblyToLoadConsumersFrom);

    assemblyToLoadConsumersFrom
        .GetAllImplementations(typeof(CommandResponseConsumerBase<,>)).ToList() // returns classes which implement CommandResponseConsumerBase<,>)
        .ForEach(impl =>
        {
            x.AddConsumer(impl.ImplementedType); // this works like x.AddConsumers(assemblyToLoadConsumersFrom);
            // but how to configure consumer here?
        });

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(new Uri(settings.ConnectionString));

        cfg.ConfigureEndpoints(context);
    });
});

x.AddConsumer(impl.ImplementedType) 方法允许我添加消费者,但我无法传递 ConsumerTypeDefinition,因为我没有任何定义,我不想为每个消费者定义一个。也不能使用 x.AddConsumer<T>(...) 因为我在动态变量中有消费者类型。还尝试了 x.AddConsumer(impl.ImplementedType).Endpoint(e => e.?),但端点配置器只允许更改名称、临时设置和其他一些设置,但不允许像 DiscardFaultedMessages().

这样的设置

使用 MassTransit/AspNetCore/RabbitMq 7.2.4


最终解决方案(感谢 Chris):

// Actually this definition class is not even aware of exact 
// consumer type.

public class CommandResponseConsumerDefinition<TConsumer> : ConsumerDefinition<TConsumer>
    where TConsumer : class, IConsumer
{
    protected override void ConfigureConsumer(
        IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<TConsumer> consumerConfigurator)
    {
        endpointConfigurator.DiscardFaultedMessages();
    }
}
assemblyToLoadConsumersFrom!
    .GetAllImplementations(typeof(CommandResponseConsumerBase<,>))
    .ToList()
    .ForEach(impl =>
    {
        // create definition type that is aware of exact consumer type
        var definitionType = typeof(CommandResponseConsumerDefinition<>)
            .MakeGenericType(impl.ImplementedType);

        // Add consumer type along with definition type
        x.AddConsumer(impl.ImplementedType, definitionType);
    });

您可以创建两个通用消费者定义,一个用于查询库,一个用于命令库,将定义的实现通用类型与消费者一起注册(调用 AddConsumer 时) .

类似于:

public class QueryConsumerDefinition<TConsumer> :
    ConsumerDefinition<TConsumer>
    where TConsumer : QueryConsumerBase
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TConsumer> consumerConfigurator)
    {
        endpointConfigurator.DiscardFaultedMessages();
    }
}