MassTransit - 多个消费者都可以收到相同的消息吗?
MassTransit - Can Multiple Consumers All Receive Same Message?
我有一个 .NET 4.5.2 服务通过 MassTransit 向 RabbitMq 发布消息。
并且 多个 个 .NET Core 2.1 服务实例使用这些消息。
目前,.NET 核心消费者服务的竞争实例正在窃取其他实例的消息。
即第一个使用消息的人将其从队列中取出,其余服务实例不会使用它。
我希望 所有 个实例使用相同的消息。
我怎样才能做到这一点?
发布者服务配置如下:
builder.Register(context =>
{
MessageCorrelation.UseCorrelationId<MyWrapper>(x => x.CorrelationId);
return Bus.Factory.CreateUsingRabbitMq(configurator =>
{
configurator.Host(new Uri("rabbitmq://localhost:5671"), host =>
{
host.Username(***);
host.Password(***);
});
configurator.Message<MyWrapper>(x => { x.SetEntityName("my.exchange"); });
configurator.Publish<MyWrapper>(x =>
{
x.AutoDelete = true;
x.Durable = true;
x.ExchangeType = true;
});
});
})
.As<IBusControl>()
.As<IBus>()
.SingleInstance();
.NET Core Consumer Services 配置如下:
serviceCollection.AddScoped<MyWrapperConsumer>();
serviceCollection.AddMassTransit(serviceConfigurator =>
{
serviceConfigurator.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost:5671"), hostConfigurator =>
{
hostConfigurator.Username(***);
hostConfigurator.Password(***);
});
cfg.ReceiveEndpoint(host, "my.exchange", exchangeConfigurator =>
{
exchangeConfigurator.AutoDelete = true;
exchangeConfigurator.Durable = true;
exchangeConfigurator.ExchangeType = "topic";
exchangeConfigurator.Consumer<MyWrapperConsumer>(provider);
});
}));
});
serviceCollection.AddSingleton<IHostedService, BusService>();
然后 MyWrapperConsumer 看起来像这样:
public class MyWrapperConsumer :
IConsumer<MyWrapper>
{
.
.
public MyWrapperConsumer(...) => (..) = (..);
public async Task Consume(ConsumeContext<MyWrapper> context)
{
//Do Stuff
}
}
听起来您想发布消息并让多个消费者服务实例接收它们。在那种情况下,每个服务实例都需要有自己的队列。这样一来,每条已发布的消息都会将副本传送到每个队列。然后,每个接收端点将从自己的队列中读取该消息并使用它。
您所做的所有过度配置都违背了您的意愿。要使其工作,请删除所有交换类型配置,并为每个服务实例配置一个唯一的队列名称(您可以从主机、机器等生成它),然后在消息生产者上调用 Publish。
您可以看到 RabbitMQ 拓扑是如何配置的:https://masstransit-project.com/advanced/topology/rabbitmq.html
感谢 Chris Patterson 的回答和 Alexey Zimarev 的评论,我现在相信我已经成功了。
这些人指出(根据我的理解,如果我错了请纠正我)我应该摆脱自己指定 Exchanges 和 Queues 等并停止对我的配置如此细化。
并让 MassTransit 完成工作,根据我的类型了解要创建和发布到哪个交换,以及要创建和绑定到该交换的队列 MyWrapper
。我的 IConsumer
实现类型 MyWrapperConsumer
。
然后为每个消费者服务提供自己唯一的 ReceiveEndpoint
名称,我们最终将交换将 MyWrapper 类型的消息散布到每个由指定的唯一名称创建的唯一队列。
所以,就我而言..
THE PUBLISHER SERVICE 配置相关代码行更改自:
configurator.Message<MyWrapper>(x => { x.SetEntityName("my.exchange"); });
configurator.Publish<MyWrapper>(x =>
{
x.AutoDelete = true;
x.Durable = true;
x.ExchangeType = true;
});
至此
configurator.Message<MyWrapper>(x => { });
configurator.AutoDelete = true;
AND EACH CONSUMERS SERVICE 实例配置相关的代码行更改自:
cfg.ReceiveEndpoint(host, "my.exchange", exchangeConfigurator =>
{
exchangeConfigurator.AutoDelete = true;
exchangeConfigurator.Durable = true;
exchangeConfigurator.ExchangeType = "topic";
exchangeConfigurator.Consumer<MyWrapperConsumer>(provider);
});
对此:
cfg.ReceiveEndpoint(host, Environment.MachineName, queueConfigurator =>
{
queueConfigurator.AutoDelete = true;
queueConfigurator.Consumer<MyWrapperConsumer>(provider);
});
请注意,Environment.MachineName
为每个实例提供了唯一的队列名称
默认情况下,RabbitMQ 将每条消息按顺序发送给所有消费者。这种类型的调度称为 "round-robin" 并用于负载平衡(您可以让多个服务实例使用相同的消息)。
正如 Chris 指出的那样,为确保您的服务始终收到其消息副本,您需要提供唯一的队列名称。
我想分享一个略有不同的代码示例。
实例编号:
Specifies an identifier that uniquely identifies the endpoint
instance, which is appended to the end of the endpoint name.
services.AddMassTransit(x => {
x.SetKebabCaseEndpointNameFormatter();
Guid instanceId = Guid.NewGuid();
x.AddConsumer<MyConsumer>()
.Endpoint(c => c.InstanceId = instanceId.ToString());
x.UsingRabbitMq((context, cfg) => {
...
cfg.ConfigureEndpoints(context);
});
});
我们可以通过为每个消费者服务设置单独的队列并将每个队列与单个交换器绑定来实现它。当我们发布消息进行交换时,它会将消息的副本发送到每个队列并最终由每个消费者服务接收。
留言:
namespace Masstransit.Message
{
public interface ICustomerRegistered
{
Guid Id { get; }
DateTime RegisteredUtc { get; }
string Name { get; }
string Address { get; }
}
}
namespace Masstransit.Message
{
public interface IRegisterCustomer
{
Guid Id { get; }
DateTime RegisteredUtc { get; }
string Name { get; }
string Address { get; }
}
}
发布商控制台应用程序:
namespace Masstransit.Publisher
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("CUSTOMER REGISTRATION COMMAND PUBLISHER");
Console.Title = "Publisher window";
RunMassTransitPublisher();
}
private static void RunMassTransitPublisher()
{
string rabbitMqAddress = "rabbitmq://localhost:5672";
string rabbitMqQueue = "mycompany.domains.queues";
Uri rabbitMqRootUri = new Uri(rabbitMqAddress);
IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
rabbit.Host(rabbitMqRootUri, settings =>
{
settings.Password("guest");
settings.Username("guest");
});
});
Task<ISendEndpoint> sendEndpointTask = rabbitBusControl.GetSendEndpoint(new Uri(string.Concat(rabbitMqAddress, "/", rabbitMqQueue)));
ISendEndpoint sendEndpoint = sendEndpointTask.Result;
Task sendTask = sendEndpoint.Send<IRegisterCustomer>(new
{
Address = "New Street",
Id = Guid.NewGuid(),
RegisteredUtc = DateTime.UtcNow,
Name = "Nice people LTD"
}, c =>
{
c.FaultAddress = new Uri("rabbitmq://localhost:5672/accounting/mycompany.queues.errors.newcustomers");
});
Console.ReadKey();
}
}
}
接收器管理控制台应用程序:
namespace Masstransit.Receiver.Management
{
class Program
{
static void Main(string[] args)
{
Console.Title = "Management consumer";
Console.WriteLine("MANAGEMENT");
RunMassTransitReceiver();
}
private static void RunMassTransitReceiver()
{
IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>
{
settings.Password("guest");
settings.Username("guest");
});
rabbit.ReceiveEndpoint("mycompany.domains.queues.events.mgmt", conf =>
{
conf.Consumer<CustomerRegisteredConsumerMgmt>();
});
});
rabbitBusControl.Start();
Console.ReadKey();
rabbitBusControl.Stop();
}
}
}
Receiver Sales Console 应用程序:
namespace Masstransit.Receiver.Sales
{
class Program
{
static void Main(string[] args)
{
Console.Title = "Sales consumer";
Console.WriteLine("SALES");
RunMassTransitReceiver();
}
private static void RunMassTransitReceiver()
{
IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>
{
settings.Password("guest");
settings.Username("guest");
});
rabbit.ReceiveEndpoint("mycompany.domains.queues.events.sales", conf =>
{
conf.Consumer<CustomerRegisteredConsumerSls>();
});
});
rabbitBusControl.Start();
Console.ReadKey();
rabbitBusControl.Stop();
}
}
}
您可以在 https://github.com/prasantj409/Masstransit-PublishMultipleConsumer.git
上找到可行的解决方案
您需要做的事情:
- 确保您的消费者使用相同的通用类型
实现IConsumer
接口
- 注册所有这些消费者
- 使用
Publish
方式发送消息
MassTransit 中通常有两种类型的消息:事件和命令,在这种情况下,您的消息是事件。如果您的消息是命令,只有一个消费者接收消息,您需要使用 Send
方法。
事件 DTO 示例:
public class OrderChecked
{
public Guid OrderId { get; set; }
}
消费者:
public class OrderSuccessfullyCheckedConsumer : IConsumer<OrderChecked>
{
public async Task Consume(ConsumeContext<OrderChecked> context)
{
// some your consuming code
}
}
public class OrderSuccessfullyCheckedConsumer2 : IConsumer<OrderChecked>
{
public async Task Consume(ConsumeContext<OrderChecked> context)
{
// some your second consuming code
}
}
正在配置:
services.AddMassTransit(c =>
{
c.AddConsumer<OrderSuccessfullyCheckedConsumer>();
c.AddConsumer<OrderSuccessfullyCheckedConsumer2>();
c.SetKebabCaseEndpointNameFormatter();
c.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService(true);
发布消息:
var endpoint = await _bus.GetPublishSendEndpoint<OrderChecked>();
await endpoint.Send(new OrderChecked
{
OrderId = newOrder.Id
});
我有一个 .NET 4.5.2 服务通过 MassTransit 向 RabbitMq 发布消息。
并且 多个 个 .NET Core 2.1 服务实例使用这些消息。
目前,.NET 核心消费者服务的竞争实例正在窃取其他实例的消息。
即第一个使用消息的人将其从队列中取出,其余服务实例不会使用它。
我希望 所有 个实例使用相同的消息。
我怎样才能做到这一点?
发布者服务配置如下:
builder.Register(context =>
{
MessageCorrelation.UseCorrelationId<MyWrapper>(x => x.CorrelationId);
return Bus.Factory.CreateUsingRabbitMq(configurator =>
{
configurator.Host(new Uri("rabbitmq://localhost:5671"), host =>
{
host.Username(***);
host.Password(***);
});
configurator.Message<MyWrapper>(x => { x.SetEntityName("my.exchange"); });
configurator.Publish<MyWrapper>(x =>
{
x.AutoDelete = true;
x.Durable = true;
x.ExchangeType = true;
});
});
})
.As<IBusControl>()
.As<IBus>()
.SingleInstance();
.NET Core Consumer Services 配置如下:
serviceCollection.AddScoped<MyWrapperConsumer>();
serviceCollection.AddMassTransit(serviceConfigurator =>
{
serviceConfigurator.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost:5671"), hostConfigurator =>
{
hostConfigurator.Username(***);
hostConfigurator.Password(***);
});
cfg.ReceiveEndpoint(host, "my.exchange", exchangeConfigurator =>
{
exchangeConfigurator.AutoDelete = true;
exchangeConfigurator.Durable = true;
exchangeConfigurator.ExchangeType = "topic";
exchangeConfigurator.Consumer<MyWrapperConsumer>(provider);
});
}));
});
serviceCollection.AddSingleton<IHostedService, BusService>();
然后 MyWrapperConsumer 看起来像这样:
public class MyWrapperConsumer :
IConsumer<MyWrapper>
{
.
.
public MyWrapperConsumer(...) => (..) = (..);
public async Task Consume(ConsumeContext<MyWrapper> context)
{
//Do Stuff
}
}
听起来您想发布消息并让多个消费者服务实例接收它们。在那种情况下,每个服务实例都需要有自己的队列。这样一来,每条已发布的消息都会将副本传送到每个队列。然后,每个接收端点将从自己的队列中读取该消息并使用它。
您所做的所有过度配置都违背了您的意愿。要使其工作,请删除所有交换类型配置,并为每个服务实例配置一个唯一的队列名称(您可以从主机、机器等生成它),然后在消息生产者上调用 Publish。
您可以看到 RabbitMQ 拓扑是如何配置的:https://masstransit-project.com/advanced/topology/rabbitmq.html
感谢 Chris Patterson 的回答和 Alexey Zimarev 的评论,我现在相信我已经成功了。
这些人指出(根据我的理解,如果我错了请纠正我)我应该摆脱自己指定 Exchanges 和 Queues 等并停止对我的配置如此细化。
并让 MassTransit 完成工作,根据我的类型了解要创建和发布到哪个交换,以及要创建和绑定到该交换的队列 MyWrapper
。我的 IConsumer
实现类型 MyWrapperConsumer
。
然后为每个消费者服务提供自己唯一的 ReceiveEndpoint
名称,我们最终将交换将 MyWrapper 类型的消息散布到每个由指定的唯一名称创建的唯一队列。
所以,就我而言..
THE PUBLISHER SERVICE 配置相关代码行更改自:
configurator.Message<MyWrapper>(x => { x.SetEntityName("my.exchange"); });
configurator.Publish<MyWrapper>(x =>
{
x.AutoDelete = true;
x.Durable = true;
x.ExchangeType = true;
});
至此
configurator.Message<MyWrapper>(x => { });
configurator.AutoDelete = true;
AND EACH CONSUMERS SERVICE 实例配置相关的代码行更改自:
cfg.ReceiveEndpoint(host, "my.exchange", exchangeConfigurator =>
{
exchangeConfigurator.AutoDelete = true;
exchangeConfigurator.Durable = true;
exchangeConfigurator.ExchangeType = "topic";
exchangeConfigurator.Consumer<MyWrapperConsumer>(provider);
});
对此:
cfg.ReceiveEndpoint(host, Environment.MachineName, queueConfigurator =>
{
queueConfigurator.AutoDelete = true;
queueConfigurator.Consumer<MyWrapperConsumer>(provider);
});
请注意,Environment.MachineName
为每个实例提供了唯一的队列名称
默认情况下,RabbitMQ 将每条消息按顺序发送给所有消费者。这种类型的调度称为 "round-robin" 并用于负载平衡(您可以让多个服务实例使用相同的消息)。 正如 Chris 指出的那样,为确保您的服务始终收到其消息副本,您需要提供唯一的队列名称。
我想分享一个略有不同的代码示例。 实例编号:
Specifies an identifier that uniquely identifies the endpoint instance, which is appended to the end of the endpoint name.
services.AddMassTransit(x => {
x.SetKebabCaseEndpointNameFormatter();
Guid instanceId = Guid.NewGuid();
x.AddConsumer<MyConsumer>()
.Endpoint(c => c.InstanceId = instanceId.ToString());
x.UsingRabbitMq((context, cfg) => {
...
cfg.ConfigureEndpoints(context);
});
});
我们可以通过为每个消费者服务设置单独的队列并将每个队列与单个交换器绑定来实现它。当我们发布消息进行交换时,它会将消息的副本发送到每个队列并最终由每个消费者服务接收。
留言:
namespace Masstransit.Message
{
public interface ICustomerRegistered
{
Guid Id { get; }
DateTime RegisteredUtc { get; }
string Name { get; }
string Address { get; }
}
}
namespace Masstransit.Message
{
public interface IRegisterCustomer
{
Guid Id { get; }
DateTime RegisteredUtc { get; }
string Name { get; }
string Address { get; }
}
}
发布商控制台应用程序:
namespace Masstransit.Publisher
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("CUSTOMER REGISTRATION COMMAND PUBLISHER");
Console.Title = "Publisher window";
RunMassTransitPublisher();
}
private static void RunMassTransitPublisher()
{
string rabbitMqAddress = "rabbitmq://localhost:5672";
string rabbitMqQueue = "mycompany.domains.queues";
Uri rabbitMqRootUri = new Uri(rabbitMqAddress);
IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
rabbit.Host(rabbitMqRootUri, settings =>
{
settings.Password("guest");
settings.Username("guest");
});
});
Task<ISendEndpoint> sendEndpointTask = rabbitBusControl.GetSendEndpoint(new Uri(string.Concat(rabbitMqAddress, "/", rabbitMqQueue)));
ISendEndpoint sendEndpoint = sendEndpointTask.Result;
Task sendTask = sendEndpoint.Send<IRegisterCustomer>(new
{
Address = "New Street",
Id = Guid.NewGuid(),
RegisteredUtc = DateTime.UtcNow,
Name = "Nice people LTD"
}, c =>
{
c.FaultAddress = new Uri("rabbitmq://localhost:5672/accounting/mycompany.queues.errors.newcustomers");
});
Console.ReadKey();
}
}
}
接收器管理控制台应用程序:
namespace Masstransit.Receiver.Management
{
class Program
{
static void Main(string[] args)
{
Console.Title = "Management consumer";
Console.WriteLine("MANAGEMENT");
RunMassTransitReceiver();
}
private static void RunMassTransitReceiver()
{
IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>
{
settings.Password("guest");
settings.Username("guest");
});
rabbit.ReceiveEndpoint("mycompany.domains.queues.events.mgmt", conf =>
{
conf.Consumer<CustomerRegisteredConsumerMgmt>();
});
});
rabbitBusControl.Start();
Console.ReadKey();
rabbitBusControl.Stop();
}
}
}
Receiver Sales Console 应用程序:
namespace Masstransit.Receiver.Sales
{
class Program
{
static void Main(string[] args)
{
Console.Title = "Sales consumer";
Console.WriteLine("SALES");
RunMassTransitReceiver();
}
private static void RunMassTransitReceiver()
{
IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>
{
settings.Password("guest");
settings.Username("guest");
});
rabbit.ReceiveEndpoint("mycompany.domains.queues.events.sales", conf =>
{
conf.Consumer<CustomerRegisteredConsumerSls>();
});
});
rabbitBusControl.Start();
Console.ReadKey();
rabbitBusControl.Stop();
}
}
}
您可以在 https://github.com/prasantj409/Masstransit-PublishMultipleConsumer.git
上找到可行的解决方案您需要做的事情:
- 确保您的消费者使用相同的通用类型 实现
- 注册所有这些消费者
- 使用
Publish
方式发送消息
IConsumer
接口
MassTransit 中通常有两种类型的消息:事件和命令,在这种情况下,您的消息是事件。如果您的消息是命令,只有一个消费者接收消息,您需要使用 Send
方法。
事件 DTO 示例:
public class OrderChecked
{
public Guid OrderId { get; set; }
}
消费者:
public class OrderSuccessfullyCheckedConsumer : IConsumer<OrderChecked>
{
public async Task Consume(ConsumeContext<OrderChecked> context)
{
// some your consuming code
}
}
public class OrderSuccessfullyCheckedConsumer2 : IConsumer<OrderChecked>
{
public async Task Consume(ConsumeContext<OrderChecked> context)
{
// some your second consuming code
}
}
正在配置:
services.AddMassTransit(c =>
{
c.AddConsumer<OrderSuccessfullyCheckedConsumer>();
c.AddConsumer<OrderSuccessfullyCheckedConsumer2>();
c.SetKebabCaseEndpointNameFormatter();
c.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService(true);
发布消息:
var endpoint = await _bus.GetPublishSendEndpoint<OrderChecked>();
await endpoint.Send(new OrderChecked
{
OrderId = newOrder.Id
});