使用 Consumer Class 名称作为队列名称
Using Consumer Class names as queue names
在 this video by Garry Taylor 分钟 35:00 到 39:50 之间,他能够在 RabbitMQ 中创建基于消费者 class 名称命名的队列。他通过调用 RabbitMQ BusFactory Configurator 上的 ConfigureEndpoints
方法并将总线注册上下文作为参数传递给该方法来做到这一点,如下所示:
rmqBusFactoryConfigurator.ConfigureEndpoints(busRegistrationContext);
我想知道我是否可以使用 RabbitMQ BusFactory 配置器上的 ReceiveEndpoint
方法实现相同的目的。
我的设置如下:
.NET6 WepApi(发布者):
=============================
Program.cs
public static IServiceCollection EnableMessagePublisher(this IServiceCollection services)
{
services.AddMassTransit(busRegistrationConfigurator =>
{
busRegistrationConfigurator.SetKebabCaseEndpointNameFormatter();
busRegistrationConfigurator.UsingRabbitMq((busRegistrationContext, rmqBusFactoryConfigurator) =>
{
rmqBusFactoryConfigurator.Host("busbunny", "/", "15672", rmqHostConfigurator =>
{
rmqHostConfigurator.Username("guest");
rmqHostConfigurator.Password("guest");
});
rmqBusFactoryConfigurator.Message<ICreateResourceMessage>(messageTopologyConfigurator =>
{
messageTopologyConfigurator.SetEntityName("ResourceCreator.Exchange");
});
rmqBusFactoryConfigurator.Publish<ICreateResourceMessage>(rmqMessageTopologyConfigurator =>
{
rmqMessageTopologyConfigurator.ExchangeType = ExchangeType.Fanout;
});
});
});
return services;
}
Controller.cs
[ApiController]
[Route("api/resources")]
public class ResourcesController : ControllerBase
{
private readonly IPublishEndpoint publishEndpoint;
public ResourcesController(IPublishEndpoint publishEndpoint)
{
this.publishEndpoint = publishEndpoint;
}
// POST
[HttpPost]
public async Task<IActionResult> CreateResource([FromBody]Resource resource)
{
if (!ModelState.IsValid)
{
return BadRequest(modelState: ModelState);
}
ICreateResourceMessage createResourceMessage =
new CreateResourceMessage(Guid.NewGuid(), resource.Name, resource.Description);
await this.publishEndpoint.Publish<ICreateResourceMessage>(createResourceMessage);
return Ok(createResourceMessage);
}
}
.NET6 工作者服务(消费者):
==================================
Consumer.cs
我有两个消费者都具有与下面相同的代码行,因此为了简洁起见,我只包括一个。
public class Resource1MessageConsumer : IConsumer<ICreateResourceMessage>
{
private readonly ILogger<Resource1MessageConsumer> logger;
public Resource1MessageConsumer(ILogger<Resource1MessageConsumer> logger)
{
this.logger = logger;
}
public async Task Consume(ConsumeContext<ICreateResourceMessage> context)
{
var x = context.Message;
}
}
Program.cs
public static IServiceCollection EnableMessageConsumers(this IServiceCollection services)
{
services.AddMassTransit(busRegistrationConfigurator =>
{
busRegistrationConfigurator.SetKebabCaseEndpointNameFormatter();
busRegistrationConfigurator.AddConsumer<Resource1MessageConsumer, Resource1MessageConsumerDefinition>();
busRegistrationConfigurator.AddConsumer<Resource2MessageConsumer, Resource2MessageConsumerDefinition>();
busRegistrationConfigurator.UsingRabbitMq((busRegistrationContext, rmqBusFactoryConfigurator) =>
{
rmqBusFactoryConfigurator.Host("bugsbunny", "/", "15672", rmqHostConfigurator =>
{
rmqHostConfigurator.Username("guest");
rmqHostConfigurator.Password("guest");
});
rmqBusFactoryConfigurator.ReceiveEndpoint(rmqReceiveEndpointConfigurator =>
{
rmqReceiveEndpointConfigurator.ConfigureConsumer(busRegistrationContext, typeof(Resource1MessageConsumer));
rmqReceiveEndpointConfigurator.ConfigureConsumer(busRegistrationContext, typeof(Resource2MessageConsumer));
rmqReceiveEndpointConfigurator.Bind("ResourceCreator.Exchange");
});
});
});
return services;
}
根据我的理解,可以使用 ConfigureEndpoints
方法或 ReceiveEndpoints
方法。就我而言,ReceiveEndpoints
非常适合我想要实现的目标,因为我可以专门将消费者绑定到交易所。
但是,我希望消费者的队列及其相关交换(在 RabbitMQ 中)与消费者 class 具有相同的命名约定,即它与 ConfigureEndpoints
的工作方式完全相同。
有没有人能做到这一点?
提前致谢。
您在端点上有两个消费者,每个消费者都有不同的名称。因此,下面的代码使用消费者名称之一配置它们:
busRegistrationConfigurator.UsingRabbitMq((busRegistrationContext, rmqBusFactoryConfigurator) =>
{
rmqBusFactoryConfigurator.Host("bugsbunny", 5672, "/", rmqHostConfigurator =>
{
rmqHostConfigurator.Username("guest");
rmqHostConfigurator.Password("guest");
});
var formatter = busRegistrationContext.GetService<IEndpointNameFormatter>()
?? DefaultEndpointNameFormatter.Instance;
var endpointName = formatter.Consumer<Resource1MessageConsumer>();
rmqBusFactoryConfigurator.ReceiveEndpoint(endpointName, rmqReceiveEndpointConfigurator =>
{
rmqReceiveEndpointConfigurator.ConfigureConsumer<Resource1MessageConsumer>(busRegistrationContext);
rmqReceiveEndpointConfigurator.ConfigureConsumer<Resource2MessageConsumer>(busRegistrationContext);
rmqReceiveEndpointConfigurator.Bind("ResourceCreator.Exchange");
});
});
Note, I also fixed the PORT in your host configuration since it was wrong.
在 this video by Garry Taylor 分钟 35:00 到 39:50 之间,他能够在 RabbitMQ 中创建基于消费者 class 名称命名的队列。他通过调用 RabbitMQ BusFactory Configurator 上的 ConfigureEndpoints
方法并将总线注册上下文作为参数传递给该方法来做到这一点,如下所示:
rmqBusFactoryConfigurator.ConfigureEndpoints(busRegistrationContext);
我想知道我是否可以使用 RabbitMQ BusFactory 配置器上的 ReceiveEndpoint
方法实现相同的目的。
我的设置如下:
.NET6 WepApi(发布者):
=============================
Program.cs
public static IServiceCollection EnableMessagePublisher(this IServiceCollection services)
{
services.AddMassTransit(busRegistrationConfigurator =>
{
busRegistrationConfigurator.SetKebabCaseEndpointNameFormatter();
busRegistrationConfigurator.UsingRabbitMq((busRegistrationContext, rmqBusFactoryConfigurator) =>
{
rmqBusFactoryConfigurator.Host("busbunny", "/", "15672", rmqHostConfigurator =>
{
rmqHostConfigurator.Username("guest");
rmqHostConfigurator.Password("guest");
});
rmqBusFactoryConfigurator.Message<ICreateResourceMessage>(messageTopologyConfigurator =>
{
messageTopologyConfigurator.SetEntityName("ResourceCreator.Exchange");
});
rmqBusFactoryConfigurator.Publish<ICreateResourceMessage>(rmqMessageTopologyConfigurator =>
{
rmqMessageTopologyConfigurator.ExchangeType = ExchangeType.Fanout;
});
});
});
return services;
}
Controller.cs
[ApiController]
[Route("api/resources")]
public class ResourcesController : ControllerBase
{
private readonly IPublishEndpoint publishEndpoint;
public ResourcesController(IPublishEndpoint publishEndpoint)
{
this.publishEndpoint = publishEndpoint;
}
// POST
[HttpPost]
public async Task<IActionResult> CreateResource([FromBody]Resource resource)
{
if (!ModelState.IsValid)
{
return BadRequest(modelState: ModelState);
}
ICreateResourceMessage createResourceMessage =
new CreateResourceMessage(Guid.NewGuid(), resource.Name, resource.Description);
await this.publishEndpoint.Publish<ICreateResourceMessage>(createResourceMessage);
return Ok(createResourceMessage);
}
}
.NET6 工作者服务(消费者):
==================================
Consumer.cs
我有两个消费者都具有与下面相同的代码行,因此为了简洁起见,我只包括一个。
public class Resource1MessageConsumer : IConsumer<ICreateResourceMessage>
{
private readonly ILogger<Resource1MessageConsumer> logger;
public Resource1MessageConsumer(ILogger<Resource1MessageConsumer> logger)
{
this.logger = logger;
}
public async Task Consume(ConsumeContext<ICreateResourceMessage> context)
{
var x = context.Message;
}
}
Program.cs
public static IServiceCollection EnableMessageConsumers(this IServiceCollection services)
{
services.AddMassTransit(busRegistrationConfigurator =>
{
busRegistrationConfigurator.SetKebabCaseEndpointNameFormatter();
busRegistrationConfigurator.AddConsumer<Resource1MessageConsumer, Resource1MessageConsumerDefinition>();
busRegistrationConfigurator.AddConsumer<Resource2MessageConsumer, Resource2MessageConsumerDefinition>();
busRegistrationConfigurator.UsingRabbitMq((busRegistrationContext, rmqBusFactoryConfigurator) =>
{
rmqBusFactoryConfigurator.Host("bugsbunny", "/", "15672", rmqHostConfigurator =>
{
rmqHostConfigurator.Username("guest");
rmqHostConfigurator.Password("guest");
});
rmqBusFactoryConfigurator.ReceiveEndpoint(rmqReceiveEndpointConfigurator =>
{
rmqReceiveEndpointConfigurator.ConfigureConsumer(busRegistrationContext, typeof(Resource1MessageConsumer));
rmqReceiveEndpointConfigurator.ConfigureConsumer(busRegistrationContext, typeof(Resource2MessageConsumer));
rmqReceiveEndpointConfigurator.Bind("ResourceCreator.Exchange");
});
});
});
return services;
}
根据我的理解,可以使用 ConfigureEndpoints
方法或 ReceiveEndpoints
方法。就我而言,ReceiveEndpoints
非常适合我想要实现的目标,因为我可以专门将消费者绑定到交易所。
但是,我希望消费者的队列及其相关交换(在 RabbitMQ 中)与消费者 class 具有相同的命名约定,即它与 ConfigureEndpoints
的工作方式完全相同。
有没有人能做到这一点?
提前致谢。
您在端点上有两个消费者,每个消费者都有不同的名称。因此,下面的代码使用消费者名称之一配置它们:
busRegistrationConfigurator.UsingRabbitMq((busRegistrationContext, rmqBusFactoryConfigurator) =>
{
rmqBusFactoryConfigurator.Host("bugsbunny", 5672, "/", rmqHostConfigurator =>
{
rmqHostConfigurator.Username("guest");
rmqHostConfigurator.Password("guest");
});
var formatter = busRegistrationContext.GetService<IEndpointNameFormatter>()
?? DefaultEndpointNameFormatter.Instance;
var endpointName = formatter.Consumer<Resource1MessageConsumer>();
rmqBusFactoryConfigurator.ReceiveEndpoint(endpointName, rmqReceiveEndpointConfigurator =>
{
rmqReceiveEndpointConfigurator.ConfigureConsumer<Resource1MessageConsumer>(busRegistrationContext);
rmqReceiveEndpointConfigurator.ConfigureConsumer<Resource2MessageConsumer>(busRegistrationContext);
rmqReceiveEndpointConfigurator.Bind("ResourceCreator.Exchange");
});
});
Note, I also fixed the PORT in your host configuration since it was wrong.