MassTransit (RabbitMq) + Asp.Net 核心 2.1:同一项目中的多个消费者

MassTransit (RabbitMq) + Asp.Net Core 2.1: Multiple consumers in the same project

我有一个 Asp.Net 2.1 项目,它作为服务主机使用其他 processes/applications 发布的消息。我在 Startup class (Startup.cs) 中有 setup/configured 多个消费者,如下所示(为简洁起见,此处仅给出了 MassTransit 部分):

public void ConfigureServices(IServiceCollection services)
{   
    services.AddScoped<SendMessageConsumer>();
    services.AddScoped<AnotherMessageConsumer>();
    services.AddMassTransit(c =>
    {
        c.AddConsumer<SendMessageConsumer>();
        c.AddConsumer<AnotherMessageConsumer>();
    });

    services.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        var host = cfg.Host("localhost", "/", h => { });

        cfg.ReceiveEndpoint(host, "Queue-1", e =>
        {
            e.PrefetchCount = 16;
            e.UseMessageRetry(x => x.Interval(2, 100));

            e.LoadFrom(provider);
            e.Consumer<SendMessageConsumer>();
            EndpointConvention.Map<Message>(e.InputAddress);                    
        });

        cfg.ReceiveEndpoint(host, "Queue-2", e =>
        {
            e.PrefetchCount = 16;
            e.UseMessageRetry(x => x.Interval(2, 100));

            e.LoadFrom(provider);
            e.Consumer<AnotherMessageConsumer>();
            EndpointConvention.Map<AnotherMessage>(e.InputAddress);
        });
    }));

    services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
    services.AddSingleton<IHostedService, BusService>();   
}

消息:

namespace MasstransitDemo.Models
{
    public class Message
    {
        public string Value { get; set; }
    }

    public class AnotherMessage
    {
        public string Value { get; set; }
    }
}

消费者:

public class SendMessageConsumer : IConsumer<Message>
{
    public Task Consume(ConsumeContext<Message> context)
    {
        Console.WriteLine($"Receive message value: {context.Message.Value}");
        return Task.CompletedTask;
    }
}

public class AnotherMessageConsumer : IConsumer<AnotherMessage>
{
    public Task Consume(ConsumeContext<AnotherMessage> context)
    {
        Console.WriteLine($"Receive another message value: {context.Message.Value}");
        return Task.CompletedTask;
    }
}

这会导致两条消息都到达每个队列。请参阅下面生成的 RabbitMq 交换:

如何设置才能使 SendMessageConsumer 仅接收 "Message" 而 AnotherMessageConsumer 接收 "AnotherMessage"?

提前致谢。

您明确告知 MassTransit 您的消费者,但 从容器中为每个端点加载所有 消费者。

e.LoadFrom(provider);
e.Consumer<AnotherMessageConsumer>();

通过这样做,你们每个端点的所有消费者都使用 LoadFrom,再加上一个消费者 Consumer<T>。因此,您的每个端点都有三个消费者,并且您将两个队列都绑定到两个交换器。

这里不需要使用LoadFromContainer。如果你的consumer有依赖需要容器来解决,你可以这样使用:

e.Consumer<AnotherMessageConsumer>(container);

它对我不起作用。你评论了e.loadfrom(provider)。请分享您的代码片段以了解

services.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(configurator =>
    {
        IRabbitMqHost rabbitMqHost=configurator.Host(_busConfiguration.RabbitMqUri, _busConfiguration.Port, _busConfiguration.Vhost,hostConfigurator =>
        {
            hostConfigurator.Username(_busConfiguration.UserName);
            hostConfigurator.Password(_busConfiguration.Password);
        });

        configurator.ReceiveEndpoint(rabbitMqHost,_busConfiguration.GeneratePayLoadQueue, e =>
        {
            e.PrefetchCount = _busConfiguration.PrefetchCount;
            //e.LoadFrom(provider);
            e.Consumer<StagingConsumerService>(provider);
            EndpointConvention.Map<StagingConsumer>(e.InputAddress);
        });

        configurator.ReceiveEndpoint(rabbitMqHost, _busConfiguration.CreateJournalQueue , e =>
        {
            e.PrefetchCount = _busConfiguration.PrefetchFinDocCount;
            //e.LoadFrom(provider);
            e.Consumer<FinDocConsumerService>();
            EndpointConvention.Map<FinDocConsumer>(e.InputAddress);
        });
    }));

    services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
    services.AddSingleton<IHostedService, BusService>();