MassTransit (RabbitMq) + Asp.Net 核心 2.1:同一项目中的多个消费者
MassTransit (RabbitMq) + Asp.Net Core 2.1: Multiple consumers in the same project
我有一个 Asp.Net 2.1 项目,它作为服务主机使用其他 processes/applications 发布的消息。我在 Startup class (Startup.cs) 中有 setup/configured 多个消费者,如下所示(为简洁起见,此处仅给出了 MassTransit 部分):
public void ConfigureServices(IServiceCollection services)
{
services.AddScoped<SendMessageConsumer>();
services.AddScoped<AnotherMessageConsumer>();
services.AddMassTransit(c =>
{
c.AddConsumer<SendMessageConsumer>();
c.AddConsumer<AnotherMessageConsumer>();
});
services.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host("localhost", "/", h => { });
cfg.ReceiveEndpoint(host, "Queue-1", e =>
{
e.PrefetchCount = 16;
e.UseMessageRetry(x => x.Interval(2, 100));
e.LoadFrom(provider);
e.Consumer<SendMessageConsumer>();
EndpointConvention.Map<Message>(e.InputAddress);
});
cfg.ReceiveEndpoint(host, "Queue-2", e =>
{
e.PrefetchCount = 16;
e.UseMessageRetry(x => x.Interval(2, 100));
e.LoadFrom(provider);
e.Consumer<AnotherMessageConsumer>();
EndpointConvention.Map<AnotherMessage>(e.InputAddress);
});
}));
services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<IHostedService, BusService>();
}
消息:
namespace MasstransitDemo.Models
{
public class Message
{
public string Value { get; set; }
}
public class AnotherMessage
{
public string Value { get; set; }
}
}
消费者:
public class SendMessageConsumer : IConsumer<Message>
{
public Task Consume(ConsumeContext<Message> context)
{
Console.WriteLine($"Receive message value: {context.Message.Value}");
return Task.CompletedTask;
}
}
public class AnotherMessageConsumer : IConsumer<AnotherMessage>
{
public Task Consume(ConsumeContext<AnotherMessage> context)
{
Console.WriteLine($"Receive another message value: {context.Message.Value}");
return Task.CompletedTask;
}
}
这会导致两条消息都到达每个队列。请参阅下面生成的 RabbitMq 交换:
如何设置才能使 SendMessageConsumer 仅接收 "Message" 而 AnotherMessageConsumer 接收 "AnotherMessage"?
提前致谢。
您明确告知 MassTransit 您的消费者,但 还 从容器中为每个端点加载所有 消费者。
e.LoadFrom(provider);
e.Consumer<AnotherMessageConsumer>();
通过这样做,你们每个端点的所有消费者都使用 LoadFrom
,再加上一个消费者 Consumer<T>
。因此,您的每个端点都有三个消费者,并且您将两个队列都绑定到两个交换器。
这里不需要使用LoadFromContainer
。如果你的consumer有依赖需要容器来解决,你可以这样使用:
e.Consumer<AnotherMessageConsumer>(container);
它对我不起作用。你评论了e.loadfrom(provider)
。请分享您的代码片段以了解
services.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(configurator =>
{
IRabbitMqHost rabbitMqHost=configurator.Host(_busConfiguration.RabbitMqUri, _busConfiguration.Port, _busConfiguration.Vhost,hostConfigurator =>
{
hostConfigurator.Username(_busConfiguration.UserName);
hostConfigurator.Password(_busConfiguration.Password);
});
configurator.ReceiveEndpoint(rabbitMqHost,_busConfiguration.GeneratePayLoadQueue, e =>
{
e.PrefetchCount = _busConfiguration.PrefetchCount;
//e.LoadFrom(provider);
e.Consumer<StagingConsumerService>(provider);
EndpointConvention.Map<StagingConsumer>(e.InputAddress);
});
configurator.ReceiveEndpoint(rabbitMqHost, _busConfiguration.CreateJournalQueue , e =>
{
e.PrefetchCount = _busConfiguration.PrefetchFinDocCount;
//e.LoadFrom(provider);
e.Consumer<FinDocConsumerService>();
EndpointConvention.Map<FinDocConsumer>(e.InputAddress);
});
}));
services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<IHostedService, BusService>();
我有一个 Asp.Net 2.1 项目,它作为服务主机使用其他 processes/applications 发布的消息。我在 Startup class (Startup.cs) 中有 setup/configured 多个消费者,如下所示(为简洁起见,此处仅给出了 MassTransit 部分):
public void ConfigureServices(IServiceCollection services)
{
services.AddScoped<SendMessageConsumer>();
services.AddScoped<AnotherMessageConsumer>();
services.AddMassTransit(c =>
{
c.AddConsumer<SendMessageConsumer>();
c.AddConsumer<AnotherMessageConsumer>();
});
services.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host("localhost", "/", h => { });
cfg.ReceiveEndpoint(host, "Queue-1", e =>
{
e.PrefetchCount = 16;
e.UseMessageRetry(x => x.Interval(2, 100));
e.LoadFrom(provider);
e.Consumer<SendMessageConsumer>();
EndpointConvention.Map<Message>(e.InputAddress);
});
cfg.ReceiveEndpoint(host, "Queue-2", e =>
{
e.PrefetchCount = 16;
e.UseMessageRetry(x => x.Interval(2, 100));
e.LoadFrom(provider);
e.Consumer<AnotherMessageConsumer>();
EndpointConvention.Map<AnotherMessage>(e.InputAddress);
});
}));
services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<IHostedService, BusService>();
}
消息:
namespace MasstransitDemo.Models
{
public class Message
{
public string Value { get; set; }
}
public class AnotherMessage
{
public string Value { get; set; }
}
}
消费者:
public class SendMessageConsumer : IConsumer<Message>
{
public Task Consume(ConsumeContext<Message> context)
{
Console.WriteLine($"Receive message value: {context.Message.Value}");
return Task.CompletedTask;
}
}
public class AnotherMessageConsumer : IConsumer<AnotherMessage>
{
public Task Consume(ConsumeContext<AnotherMessage> context)
{
Console.WriteLine($"Receive another message value: {context.Message.Value}");
return Task.CompletedTask;
}
}
这会导致两条消息都到达每个队列。请参阅下面生成的 RabbitMq 交换:
如何设置才能使 SendMessageConsumer 仅接收 "Message" 而 AnotherMessageConsumer 接收 "AnotherMessage"?
提前致谢。
您明确告知 MassTransit 您的消费者,但 还 从容器中为每个端点加载所有 消费者。
e.LoadFrom(provider);
e.Consumer<AnotherMessageConsumer>();
通过这样做,你们每个端点的所有消费者都使用 LoadFrom
,再加上一个消费者 Consumer<T>
。因此,您的每个端点都有三个消费者,并且您将两个队列都绑定到两个交换器。
这里不需要使用LoadFromContainer
。如果你的consumer有依赖需要容器来解决,你可以这样使用:
e.Consumer<AnotherMessageConsumer>(container);
它对我不起作用。你评论了e.loadfrom(provider)
。请分享您的代码片段以了解
services.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(configurator =>
{
IRabbitMqHost rabbitMqHost=configurator.Host(_busConfiguration.RabbitMqUri, _busConfiguration.Port, _busConfiguration.Vhost,hostConfigurator =>
{
hostConfigurator.Username(_busConfiguration.UserName);
hostConfigurator.Password(_busConfiguration.Password);
});
configurator.ReceiveEndpoint(rabbitMqHost,_busConfiguration.GeneratePayLoadQueue, e =>
{
e.PrefetchCount = _busConfiguration.PrefetchCount;
//e.LoadFrom(provider);
e.Consumer<StagingConsumerService>(provider);
EndpointConvention.Map<StagingConsumer>(e.InputAddress);
});
configurator.ReceiveEndpoint(rabbitMqHost, _busConfiguration.CreateJournalQueue , e =>
{
e.PrefetchCount = _busConfiguration.PrefetchFinDocCount;
//e.LoadFrom(provider);
e.Consumer<FinDocConsumerService>();
EndpointConvention.Map<FinDocConsumer>(e.InputAddress);
});
}));
services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<IHostedService, BusService>();