在公共交通中按路由键拆分消息
Split messages by routing key in mass transit
我想根据路由键将消息发送到适当的队列。从生产者应用程序开始,我的代码(相关部分)是:
var options = serviceProvider
.GetService<IConfiguration>()
.GetOptions<RabbitMqProducerOptions>("RabbitMqProducer");
foreach (var option in options?.Endpoints)
{
var method = typeof(EndpointConvention).GetMethod("Map", new[] { typeof(Uri) });
var type = Assembly.Load(option.Assembly).GetTypes().First(t => t.Name == option.Type);
var genericMethod = method.MakeGenericMethod(new[] { type });
genericMethod.Invoke(null, new[] { new Uri($"{options.Address}/{option.Name}") });
}
Bus.Factory.CreateUsingRabbitMq(cfg => {
cfg.Host(options.Address);
cfg.Send<CreateProducts>(x => x.UseRoutingKeyFormatter(context => context.Message.Platform));
});
以上没问题 - 它创建了我在配置文件中声明的交换(扇出交换,如果重要的话)。现在消费者配置:
var options = serviceProvider.GetService<IConfiguration>().GetOptions<RabbitMqConsumerOptions>("RabbitMqConsumer");
Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(options.Address);
foreach (var kvp in options.Endpoints)
{
cfg.ReceiveEndpoint("ingest-products", ep =>
{
ep.PrefetchCount = kvp.Value.PrefetchCount;
ep.BindMessageExchanges = false;
ep.UseMessageRetry(r => r.Interval(kvp.Value.RetryCount, kvp.Value.RetryInterval));
ep.Bind("ingest-amazon-products", x => BindForEndpoint(x, kvp.Value.RoutingKey));
BindExchange(ep, kvp.Value.RoutingKey, kvp.Value.Assembly, kvp.Value.Type);
ep.ConfigureConsumers(serviceProvider);
});
}
});
现在,上面的代码可以工作,但不是我想要的方式,因为没有匹配路由键的消息仍然会传送到我的消费者。我的意思是 - 如果 kvp.Value.RoutingKey
配置值为 X,并且生产者生成路由键为 Y 的消息,则监听 X 的消费者将收到 Y 消息。如何解决?
发布赏金后,我发现我应该使用BusFactoryConfigurator
接口的Publish
方法。
我想根据路由键将消息发送到适当的队列。从生产者应用程序开始,我的代码(相关部分)是:
var options = serviceProvider
.GetService<IConfiguration>()
.GetOptions<RabbitMqProducerOptions>("RabbitMqProducer");
foreach (var option in options?.Endpoints)
{
var method = typeof(EndpointConvention).GetMethod("Map", new[] { typeof(Uri) });
var type = Assembly.Load(option.Assembly).GetTypes().First(t => t.Name == option.Type);
var genericMethod = method.MakeGenericMethod(new[] { type });
genericMethod.Invoke(null, new[] { new Uri($"{options.Address}/{option.Name}") });
}
Bus.Factory.CreateUsingRabbitMq(cfg => {
cfg.Host(options.Address);
cfg.Send<CreateProducts>(x => x.UseRoutingKeyFormatter(context => context.Message.Platform));
});
以上没问题 - 它创建了我在配置文件中声明的交换(扇出交换,如果重要的话)。现在消费者配置:
var options = serviceProvider.GetService<IConfiguration>().GetOptions<RabbitMqConsumerOptions>("RabbitMqConsumer");
Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(options.Address);
foreach (var kvp in options.Endpoints)
{
cfg.ReceiveEndpoint("ingest-products", ep =>
{
ep.PrefetchCount = kvp.Value.PrefetchCount;
ep.BindMessageExchanges = false;
ep.UseMessageRetry(r => r.Interval(kvp.Value.RetryCount, kvp.Value.RetryInterval));
ep.Bind("ingest-amazon-products", x => BindForEndpoint(x, kvp.Value.RoutingKey));
BindExchange(ep, kvp.Value.RoutingKey, kvp.Value.Assembly, kvp.Value.Type);
ep.ConfigureConsumers(serviceProvider);
});
}
});
现在,上面的代码可以工作,但不是我想要的方式,因为没有匹配路由键的消息仍然会传送到我的消费者。我的意思是 - 如果 kvp.Value.RoutingKey
配置值为 X,并且生产者生成路由键为 Y 的消息,则监听 X 的消费者将收到 Y 消息。如何解决?
发布赏金后,我发现我应该使用BusFactoryConfigurator
接口的Publish
方法。