为什么我有以下错误(MassTransit)
Why do I have the following error (MassTransit)
错误是 -
ConfigurationException:已添加具有相同密钥的接收端点:事件
我有 appsettings.Development.json 和
"EventsBusOptions": {
"HostUri": "rabbitmq://rabbitmq.test.com/gate",
"UserName": "xxx",
"Password": "xxxxxx",
"QueueName": "events", //<<< if is change queue name some different string e.g. "events1" - NO error
"PrefetchCount": 16,
"UseConcurrencyLimit": 15
}
和Startup.cs(使用多总线配置)
services.AddMassTransit<IEventsBus>(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
var _options = context.GetRequiredService<IOptions<EventsBusOptions>>().Value;
cfg.Host(new Uri(_options.HostUri), h =>
{
h.Username(_options.UserName);
h.Password(_options.Password);
});
cfg.ReceiveEndpoint(_options.QueueName, ep =>
{
ep.Consumer<EventsConsumer>(context);
ep.PrefetchCount = _options.PrefetchCount ?? 15;
ep.UseConcurrencyLimit(_options.UseConcurrencyLimit ?? 16);
});
cfg.ConfigureEndpoints(context);
});
x.AddConsumer<EventsConsumer>();
});
为什么我在使用 "QueueName": "events" 时出现错误?
因为您使用错误的方法来配置消费者。应使用 ConfigureConsumer
而不仅仅是 Consumer
,如下面的更新配置所示。
services.AddMassTransit<IEventsBus>(x =>
{
x.AddConsumer<EventsConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
var _options = context.GetRequiredService<IOptions<EventsBusOptions>>().Value;
cfg.Host(new Uri(_options.HostUri), h =>
{
h.Username(_options.UserName);
h.Password(_options.Password);
});
cfg.ReceiveEndpoint(_options.QueueName, ep =>
{
ep.PrefetchCount = _options.PrefetchCount ?? 16;
ep.ConcurrentMessageLimit = _options.ConcurrentMessageLimit ?? 16;
ep.ConfigureConsumer<EventsConsumer>(context);
});
cfg.ConfigureEndpoints(context);
});
});
注意我还修复了您的并发消息限制配置以使用内置限制器,而不是添加过滤器。
另外 您可以停止 ConfigureEndpoints
因为您正在为消费者手动配置接收端点。
错误是 - ConfigurationException:已添加具有相同密钥的接收端点:事件
我有 appsettings.Development.json 和
"EventsBusOptions": {
"HostUri": "rabbitmq://rabbitmq.test.com/gate",
"UserName": "xxx",
"Password": "xxxxxx",
"QueueName": "events", //<<< if is change queue name some different string e.g. "events1" - NO error
"PrefetchCount": 16,
"UseConcurrencyLimit": 15
}
和Startup.cs(使用多总线配置)
services.AddMassTransit<IEventsBus>(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
var _options = context.GetRequiredService<IOptions<EventsBusOptions>>().Value;
cfg.Host(new Uri(_options.HostUri), h =>
{
h.Username(_options.UserName);
h.Password(_options.Password);
});
cfg.ReceiveEndpoint(_options.QueueName, ep =>
{
ep.Consumer<EventsConsumer>(context);
ep.PrefetchCount = _options.PrefetchCount ?? 15;
ep.UseConcurrencyLimit(_options.UseConcurrencyLimit ?? 16);
});
cfg.ConfigureEndpoints(context);
});
x.AddConsumer<EventsConsumer>();
});
为什么我在使用 "QueueName": "events" 时出现错误?
因为您使用错误的方法来配置消费者。应使用 ConfigureConsumer
而不仅仅是 Consumer
,如下面的更新配置所示。
services.AddMassTransit<IEventsBus>(x =>
{
x.AddConsumer<EventsConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
var _options = context.GetRequiredService<IOptions<EventsBusOptions>>().Value;
cfg.Host(new Uri(_options.HostUri), h =>
{
h.Username(_options.UserName);
h.Password(_options.Password);
});
cfg.ReceiveEndpoint(_options.QueueName, ep =>
{
ep.PrefetchCount = _options.PrefetchCount ?? 16;
ep.ConcurrentMessageLimit = _options.ConcurrentMessageLimit ?? 16;
ep.ConfigureConsumer<EventsConsumer>(context);
});
cfg.ConfigureEndpoints(context);
});
});
注意我还修复了您的并发消息限制配置以使用内置限制器,而不是添加过滤器。
另外 您可以停止 ConfigureEndpoints
因为您正在为消费者手动配置接收端点。