如何通过 MassTransit 和 RabbitMQ 发送各种命令类型?
How to send various command type by MassTransit and RabbitMQ?
我是使用消息代理的初学者。
我们有一个有多个子服务的票务服务。主管服务借助 Web API 获取请求并将它们发送到子服务。
任何请求都有一个 header 用于检测命令类型(例如 Reserve、Refund、Availability 等)。我们使用 json 来序列化 objects.
现在,如何通过 MassTransit 从发布者(例如我们的监管系统)发送各种消息类型(不同 objects),以便消费者可以轻松使用它?
一般来说,MassTransit 和 rabbitMQ 是否可以发送各种类型的消息?
每个消费者只有一个 queue 用于处理收到的消息。
谢谢
Update
https://dotnetcodr.com/2016/08/02/messaging-with-rabbitmq-and-net-review-part-1-foundations-and-terminology/
我阅读了这篇文章,适合从使用 MassTransit 进行消息传递开始,但没有看到在这些资源和其他资源上使用各种消息类型的任何示例:
我有多个命令,需要各种消息类型来发送,但在示例中只使用如下消息类型:
发件人
private static void RunMassTransitPublisherWithRabbit()
{
string rabbitMqAddress = "rabbitmq://localhost:5672/Ticket";
string rabbitMqQueue = "mycompany.domains.queues";
Uri rabbitMqRootUri = new Uri(rabbitMqAddress);
IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
rabbit.Host(rabbitMqRootUri, settings =>
{
settings.Password("Kalcho^Milano");
settings.Username("ticketadmin");
});
});
Task<ISendEndpoint> sendEndpointTask = rabbitBusControl.GetSendEndpoint(new Uri(string.Concat(rabbitMqAddress, "/", rabbitMqQueue)));
ISendEndpoint sendEndpoint = sendEndpointTask.Result;
Task sendTask = sendEndpoint.Send<IRegisterCustomer>(new
{
Address = "New Street",
Id = Guid.NewGuid(),
Preferred = true,
RegisteredUtc = DateTime.UtcNow,
Name = "Nice people LTD",
Type = 1,
DefaultDiscount = 0
});
Console.ReadKey();
}
接收器
private static void RunMassTransitReceiverWithRabbit()
{
IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
IRabbitMqHost rabbitMqHost = rabbit.Host(new Uri("rabbitmq://localhost:5672/Ticket"), settings =>
{
settings.Password("Kalcho^Milano");
settings.Username("ticketadmin");
});
rabbit.ReceiveEndpoint(rabbitMqHost, "mycompany.domains.queues", conf =>
{
conf.Consumer<RegisterCustomerConsumer>();
});
});
rabbitBusControl.Start();
Console.ReadKey();
rabbitBusControl.Stop();
}
IRegisterCustomer
是一个接口,我只能在rabbit.ReceiveEndpoint
中获取消息内容并转换为可用的object。
现在,如何使用IReserveTicket
、IRefundTicket
、IGetAvailability
等多种消息类型来收发消息?
再次感谢
如果您向端点添加更多消费者,就像这样
rabbit.ReceiveEndpoint(rabbitMqHost, "mycompany.domains.queues", conf =>
{
conf.Consumer<RegisterCustomerConsumer>();
conf.Consumer<ReserveTicketConsumer>();
conf.Consumer<RefundTicketConsumer>();
});
并发送类似
的消息
await endpoint.Send<IReserveTicket>(new { TickedId = 123 });
它会起作用的。
以上解决方案假设您的负载不重,尤其是负载不均,您会收到数百万条一种类型的消息,并且可能有数百种其他类型的消息。将所有这些都放在一个端点中会造成消费失衡,因为所有这些消费者只有一个队列。在这种情况下,没有什么能阻止您定义所需数量的端点,每个端点都应该有一个单独的队列。例如:
cfg.ReceiveEndpoint(rabbitMqHost, "mycompany.domains.lowvolume",
c =>
{
c.Consumer<RegisterCustomerConsumer>();
c.Consumer<RefundTicketConsumer>();
});
cfg.ReceiveEndpoint(rabbitMqHost, "mycompany.domains.highvolume",
c => c.Consumer<ReserveTicketConsumer>();
请记住,由于您有不同的队列,因此您需要使用这些地址来获取发送端点。
我是使用消息代理的初学者。
我们有一个有多个子服务的票务服务。主管服务借助 Web API 获取请求并将它们发送到子服务。
任何请求都有一个 header 用于检测命令类型(例如 Reserve、Refund、Availability 等)。我们使用 json 来序列化 objects.
现在,如何通过 MassTransit 从发布者(例如我们的监管系统)发送各种消息类型(不同 objects),以便消费者可以轻松使用它?
一般来说,MassTransit 和 rabbitMQ 是否可以发送各种类型的消息?
每个消费者只有一个 queue 用于处理收到的消息。
谢谢
Update
https://dotnetcodr.com/2016/08/02/messaging-with-rabbitmq-and-net-review-part-1-foundations-and-terminology/
我阅读了这篇文章,适合从使用 MassTransit 进行消息传递开始,但没有看到在这些资源和其他资源上使用各种消息类型的任何示例:
我有多个命令,需要各种消息类型来发送,但在示例中只使用如下消息类型:
发件人
private static void RunMassTransitPublisherWithRabbit()
{
string rabbitMqAddress = "rabbitmq://localhost:5672/Ticket";
string rabbitMqQueue = "mycompany.domains.queues";
Uri rabbitMqRootUri = new Uri(rabbitMqAddress);
IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
rabbit.Host(rabbitMqRootUri, settings =>
{
settings.Password("Kalcho^Milano");
settings.Username("ticketadmin");
});
});
Task<ISendEndpoint> sendEndpointTask = rabbitBusControl.GetSendEndpoint(new Uri(string.Concat(rabbitMqAddress, "/", rabbitMqQueue)));
ISendEndpoint sendEndpoint = sendEndpointTask.Result;
Task sendTask = sendEndpoint.Send<IRegisterCustomer>(new
{
Address = "New Street",
Id = Guid.NewGuid(),
Preferred = true,
RegisteredUtc = DateTime.UtcNow,
Name = "Nice people LTD",
Type = 1,
DefaultDiscount = 0
});
Console.ReadKey();
}
接收器
private static void RunMassTransitReceiverWithRabbit()
{
IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
IRabbitMqHost rabbitMqHost = rabbit.Host(new Uri("rabbitmq://localhost:5672/Ticket"), settings =>
{
settings.Password("Kalcho^Milano");
settings.Username("ticketadmin");
});
rabbit.ReceiveEndpoint(rabbitMqHost, "mycompany.domains.queues", conf =>
{
conf.Consumer<RegisterCustomerConsumer>();
});
});
rabbitBusControl.Start();
Console.ReadKey();
rabbitBusControl.Stop();
}
IRegisterCustomer
是一个接口,我只能在rabbit.ReceiveEndpoint
中获取消息内容并转换为可用的object。
现在,如何使用IReserveTicket
、IRefundTicket
、IGetAvailability
等多种消息类型来收发消息?
再次感谢
如果您向端点添加更多消费者,就像这样
rabbit.ReceiveEndpoint(rabbitMqHost, "mycompany.domains.queues", conf =>
{
conf.Consumer<RegisterCustomerConsumer>();
conf.Consumer<ReserveTicketConsumer>();
conf.Consumer<RefundTicketConsumer>();
});
并发送类似
的消息await endpoint.Send<IReserveTicket>(new { TickedId = 123 });
它会起作用的。
以上解决方案假设您的负载不重,尤其是负载不均,您会收到数百万条一种类型的消息,并且可能有数百种其他类型的消息。将所有这些都放在一个端点中会造成消费失衡,因为所有这些消费者只有一个队列。在这种情况下,没有什么能阻止您定义所需数量的端点,每个端点都应该有一个单独的队列。例如:
cfg.ReceiveEndpoint(rabbitMqHost, "mycompany.domains.lowvolume",
c =>
{
c.Consumer<RegisterCustomerConsumer>();
c.Consumer<RefundTicketConsumer>();
});
cfg.ReceiveEndpoint(rabbitMqHost, "mycompany.domains.highvolume",
c => c.Consumer<ReserveTicketConsumer>();
请记住,由于您有不同的队列,因此您需要使用这些地址来获取发送端点。