使用 Consumer Class 名称作为队列名称

Using Consumer Class names as queue names

this video by Garry Taylor 分钟 35:00 到 39:50 之间,他能够在 RabbitMQ 中创建基于消费者 class 名称命名的队列。他通过调用 RabbitMQ BusFactory Configurator 上的 ConfigureEndpoints 方法并将总线注册上下文作为参数传递给该方法来做到这一点,如下所示:

rmqBusFactoryConfigurator.ConfigureEndpoints(busRegistrationContext);

我想知道我是否可以使用 RabbitMQ BusFactory 配置器上的 ReceiveEndpoint 方法实现相同的目的。

我的设置如下:

.NET6 WepApi(发布者):

=============================

Program.cs

    public static IServiceCollection EnableMessagePublisher(this IServiceCollection services)
    {
        services.AddMassTransit(busRegistrationConfigurator =>
        {
            busRegistrationConfigurator.SetKebabCaseEndpointNameFormatter();
            
            busRegistrationConfigurator.UsingRabbitMq((busRegistrationContext, rmqBusFactoryConfigurator) =>
            {
                rmqBusFactoryConfigurator.Host("busbunny", "/", "15672", rmqHostConfigurator =>
                {
                    rmqHostConfigurator.Username("guest");
                    rmqHostConfigurator.Password("guest");
                });
                
                rmqBusFactoryConfigurator.Message<ICreateResourceMessage>(messageTopologyConfigurator =>
                {
                    messageTopologyConfigurator.SetEntityName("ResourceCreator.Exchange");
                });
                
                rmqBusFactoryConfigurator.Publish<ICreateResourceMessage>(rmqMessageTopologyConfigurator =>
                {
                    rmqMessageTopologyConfigurator.ExchangeType = ExchangeType.Fanout;
                });
            });
        });
        return services;
    }

Controller.cs

[ApiController]
[Route("api/resources")]
public class ResourcesController : ControllerBase
{
    private readonly IPublishEndpoint publishEndpoint;

    public ResourcesController(IPublishEndpoint publishEndpoint)
    {
        this.publishEndpoint = publishEndpoint;
    }

    // POST
    [HttpPost]
    public async Task<IActionResult> CreateResource([FromBody]Resource resource)
    {
        if (!ModelState.IsValid)
        {
            return BadRequest(modelState: ModelState);
        }
    
        ICreateResourceMessage createResourceMessage =
            new CreateResourceMessage(Guid.NewGuid(), resource.Name, resource.Description);

        await this.publishEndpoint.Publish<ICreateResourceMessage>(createResourceMessage);
    
        return Ok(createResourceMessage);
    }
}

.NET6 工作者服务(消费者):

==================================

Consumer.cs

我有两个消费者都具有与下面相同的代码行,因此为了简洁起见,我只包括一个。

public class Resource1MessageConsumer : IConsumer<ICreateResourceMessage>
{
    private readonly ILogger<Resource1MessageConsumer> logger;

    public Resource1MessageConsumer(ILogger<Resource1MessageConsumer> logger)
    {
        this.logger = logger;
    }

    public async Task Consume(ConsumeContext<ICreateResourceMessage> context)
    {
        var x = context.Message;
    }
}

Program.cs

    public static IServiceCollection EnableMessageConsumers(this IServiceCollection services)
    {
        services.AddMassTransit(busRegistrationConfigurator =>
        {
            busRegistrationConfigurator.SetKebabCaseEndpointNameFormatter();
        
            busRegistrationConfigurator.AddConsumer<Resource1MessageConsumer, Resource1MessageConsumerDefinition>();
            busRegistrationConfigurator.AddConsumer<Resource2MessageConsumer, Resource2MessageConsumerDefinition>();
        
            busRegistrationConfigurator.UsingRabbitMq((busRegistrationContext, rmqBusFactoryConfigurator) =>
            {
                rmqBusFactoryConfigurator.Host("bugsbunny", "/", "15672", rmqHostConfigurator =>
                {
                    rmqHostConfigurator.Username("guest");
                    rmqHostConfigurator.Password("guest");
                });
            
                rmqBusFactoryConfigurator.ReceiveEndpoint(rmqReceiveEndpointConfigurator =>
                {
                    rmqReceiveEndpointConfigurator.ConfigureConsumer(busRegistrationContext, typeof(Resource1MessageConsumer));
                    rmqReceiveEndpointConfigurator.ConfigureConsumer(busRegistrationContext, typeof(Resource2MessageConsumer));
                    rmqReceiveEndpointConfigurator.Bind("ResourceCreator.Exchange");
                });
            
            });
        });
        return services;
    }

根据我的理解,可以使用 ConfigureEndpoints 方法或 ReceiveEndpoints 方法。就我而言,ReceiveEndpoints 非常适合我想要实现的目标,因为我可以专门将消费者绑定到交易所。

但是,我希望消费者的队列及其相关交换(在 RabbitMQ 中)与消费者 class 具有相同的命名约定,即它与 ConfigureEndpoints 的工作方式完全相同。

有没有人能做到这一点?

提前致谢。

您在端点上有两个消费者,每个消费者都有不同的名称。因此,下面的代码使用消费者名称之一配置它们:

busRegistrationConfigurator.UsingRabbitMq((busRegistrationContext, rmqBusFactoryConfigurator) =>
{
    rmqBusFactoryConfigurator.Host("bugsbunny", 5672, "/", rmqHostConfigurator =>
    {
        rmqHostConfigurator.Username("guest");
        rmqHostConfigurator.Password("guest");
    });

    var formatter = busRegistrationContext.GetService<IEndpointNameFormatter>() 
        ?? DefaultEndpointNameFormatter.Instance;

    var endpointName = formatter.Consumer<Resource1MessageConsumer>();

    rmqBusFactoryConfigurator.ReceiveEndpoint(endpointName, rmqReceiveEndpointConfigurator =>
    {
        rmqReceiveEndpointConfigurator.ConfigureConsumer<Resource1MessageConsumer>(busRegistrationContext);
        rmqReceiveEndpointConfigurator.ConfigureConsumer<Resource2MessageConsumer>(busRegistrationContext);
        rmqReceiveEndpointConfigurator.Bind("ResourceCreator.Exchange");
    });
});

Note, I also fixed the PORT in your host configuration since it was wrong.