如何独立于其他消费者配置一些 types/groups 消费者?
How to configure some types/groups of consumers independently of others?
现在我使用以下代码添加所有消费者并配置端点:
services.AddMassTransit(x =>
{
x.AddConsumers(assemblyToLoadConsumersFrom);
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(settings.ConnectionString));
cfg.ConfigureEndpoints(context);
});
});
但是现在我有不同类型的消费者定义抽象 类 像 CommandResponseConsumerBase
和 QueryConsumerBase
(那些抽象 类 实现 IConsumer
接口,和实际的消费者实现继承自那些 类)。每个消费者实现都需要构造函数中的一些参数由 DI 注入(使用 Microsoft DI 和上面的代码正确地将参数注入消费者构造函数)。
问题是,例如,对于类型为 QueryConsumerBase
的所有消费者,我想设置 endpointConfiguration.DiscardFaultedMessages()
,但为其他消费者(如 CommandResponseConsumerBase
)保留默认值(而不是丢弃)。我还想保留默认拓扑/端点名称,如上面代码分配的名称。
我也不想单独配置消费者,因为我有很多消费者,这会很乏味。相反,我只想在程序集中找到所有 CommandResponseConsumerBase
使用者并注册它们并立即配置它们,然后在程序集中找到所有 QueryConsumerBase
并注册它们并立即配置它们。
代码看起来像这样:
services.AddMassTransit(x =>
{
// x.AddConsumers(assemblyToLoadConsumersFrom);
assemblyToLoadConsumersFrom
.GetAllImplementations(typeof(CommandResponseConsumerBase<,>)).ToList() // returns classes which implement CommandResponseConsumerBase<,>)
.ForEach(impl =>
{
x.AddConsumer(impl.ImplementedType); // this works like x.AddConsumers(assemblyToLoadConsumersFrom);
// but how to configure consumer here?
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(settings.ConnectionString));
cfg.ConfigureEndpoints(context);
});
});
x.AddConsumer(impl.ImplementedType)
方法允许我添加消费者,但我无法传递 ConsumerTypeDefinition,因为我没有任何定义,我不想为每个消费者定义一个。也不能使用 x.AddConsumer<T>(...)
因为我在动态变量中有消费者类型。还尝试了 x.AddConsumer(impl.ImplementedType).Endpoint(e => e.?)
,但端点配置器只允许更改名称、临时设置和其他一些设置,但不允许像 DiscardFaultedMessages()
.
这样的设置
使用 MassTransit/AspNetCore/RabbitMq 7.2.4
最终解决方案(感谢 Chris):
// Actually this definition class is not even aware of exact
// consumer type.
public class CommandResponseConsumerDefinition<TConsumer> : ConsumerDefinition<TConsumer>
where TConsumer : class, IConsumer
{
protected override void ConfigureConsumer(
IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<TConsumer> consumerConfigurator)
{
endpointConfigurator.DiscardFaultedMessages();
}
}
assemblyToLoadConsumersFrom!
.GetAllImplementations(typeof(CommandResponseConsumerBase<,>))
.ToList()
.ForEach(impl =>
{
// create definition type that is aware of exact consumer type
var definitionType = typeof(CommandResponseConsumerDefinition<>)
.MakeGenericType(impl.ImplementedType);
// Add consumer type along with definition type
x.AddConsumer(impl.ImplementedType, definitionType);
});
您可以创建两个通用消费者定义,一个用于查询库,一个用于命令库,将定义的实现通用类型与消费者一起注册(调用 AddConsumer
时) .
类似于:
public class QueryConsumerDefinition<TConsumer> :
ConsumerDefinition<TConsumer>
where TConsumer : QueryConsumerBase
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TConsumer> consumerConfigurator)
{
endpointConfigurator.DiscardFaultedMessages();
}
}
现在我使用以下代码添加所有消费者并配置端点:
services.AddMassTransit(x =>
{
x.AddConsumers(assemblyToLoadConsumersFrom);
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(settings.ConnectionString));
cfg.ConfigureEndpoints(context);
});
});
但是现在我有不同类型的消费者定义抽象 类 像 CommandResponseConsumerBase
和 QueryConsumerBase
(那些抽象 类 实现 IConsumer
接口,和实际的消费者实现继承自那些 类)。每个消费者实现都需要构造函数中的一些参数由 DI 注入(使用 Microsoft DI 和上面的代码正确地将参数注入消费者构造函数)。
问题是,例如,对于类型为 QueryConsumerBase
的所有消费者,我想设置 endpointConfiguration.DiscardFaultedMessages()
,但为其他消费者(如 CommandResponseConsumerBase
)保留默认值(而不是丢弃)。我还想保留默认拓扑/端点名称,如上面代码分配的名称。
我也不想单独配置消费者,因为我有很多消费者,这会很乏味。相反,我只想在程序集中找到所有 CommandResponseConsumerBase
使用者并注册它们并立即配置它们,然后在程序集中找到所有 QueryConsumerBase
并注册它们并立即配置它们。
代码看起来像这样:
services.AddMassTransit(x =>
{
// x.AddConsumers(assemblyToLoadConsumersFrom);
assemblyToLoadConsumersFrom
.GetAllImplementations(typeof(CommandResponseConsumerBase<,>)).ToList() // returns classes which implement CommandResponseConsumerBase<,>)
.ForEach(impl =>
{
x.AddConsumer(impl.ImplementedType); // this works like x.AddConsumers(assemblyToLoadConsumersFrom);
// but how to configure consumer here?
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(settings.ConnectionString));
cfg.ConfigureEndpoints(context);
});
});
x.AddConsumer(impl.ImplementedType)
方法允许我添加消费者,但我无法传递 ConsumerTypeDefinition,因为我没有任何定义,我不想为每个消费者定义一个。也不能使用 x.AddConsumer<T>(...)
因为我在动态变量中有消费者类型。还尝试了 x.AddConsumer(impl.ImplementedType).Endpoint(e => e.?)
,但端点配置器只允许更改名称、临时设置和其他一些设置,但不允许像 DiscardFaultedMessages()
.
使用 MassTransit/AspNetCore/RabbitMq 7.2.4
最终解决方案(感谢 Chris):
// Actually this definition class is not even aware of exact
// consumer type.
public class CommandResponseConsumerDefinition<TConsumer> : ConsumerDefinition<TConsumer>
where TConsumer : class, IConsumer
{
protected override void ConfigureConsumer(
IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<TConsumer> consumerConfigurator)
{
endpointConfigurator.DiscardFaultedMessages();
}
}
assemblyToLoadConsumersFrom!
.GetAllImplementations(typeof(CommandResponseConsumerBase<,>))
.ToList()
.ForEach(impl =>
{
// create definition type that is aware of exact consumer type
var definitionType = typeof(CommandResponseConsumerDefinition<>)
.MakeGenericType(impl.ImplementedType);
// Add consumer type along with definition type
x.AddConsumer(impl.ImplementedType, definitionType);
});
您可以创建两个通用消费者定义,一个用于查询库,一个用于命令库,将定义的实现通用类型与消费者一起注册(调用 AddConsumer
时) .
类似于:
public class QueryConsumerDefinition<TConsumer> :
ConsumerDefinition<TConsumer>
where TConsumer : QueryConsumerBase
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TConsumer> consumerConfigurator)
{
endpointConfigurator.DiscardFaultedMessages();
}
}