我如何在 .Net 中使用不同类型的消费者使用 RabbitMq 消息?
How can I consume RabbitMq messages with different type of consumers in .Net?
消息类型:"PublishX"
消费者:
Type1ConsumerX
Type2ConsumerX
Type3ConsumerX
所有消费者都必须立即捕获消息,但在自己内部同步消费..
例如队列中有 100 "PublishX" 条消息。 Type1ConsumerX 消费了 30 条消息(同步),Type2ConsumerX 消费了 50 条消息(同步),Type3ConsumerX 消费了 100 条消息(同步)。
我怎么知道消息被"all type of consumers"消费了?
可以RabbitMQ/MassTransit向消费者推送消息吗?
能否RabbitMQ/MassTransit以间隔 (1s) 推送消息(合并它们)以减少网络流量?
能否RabbitMQ/MassTransit将相同的消息推送给不同类型的消费者?
如果我对问题的理解正确,您只需设置一个基本的 pub/sub 模式。这将允许您将相同的消息传递给多个消费者。
示例发布商:
public static void PublishMessageToFanout()
{
var factory = new ConnectionFactory { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare("messages", "fanout");
var message = new Message { Text = "This is a message to send" };
var json = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(json);
channel.BasicPublish("messages", string.Empty, null, body);
}
}
示例消费者:
SubscribeToMessages("sms-messages", (s) => Console.WriteLine("SMS Message: {0}", s));
SubscribeToMessages("email-messages", (s) => Console.WriteLine("Email Message: {0}", s));
public static void SubscribeToMessages(string queueName, Action<string> messageAction)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare("messages", "fanout");
channel.QueueDeclare(queueName, true, false, false, null);
channel.QueueBind(queueName, "messages", string.Empty);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, true, consumer);
while (true)
{
var ea = consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
messageAction(message);
}
}
}
如果您 运行 SubscribeToMessages
在单独的进程或控制台应用程序中循环,您会发现无论何时调用 PublishMessageToFanout
,它们都会打印出消息。您还会看到这两个队列都存在于队列下的 RabbitMQ 管理中。
关于您问题中的公共交通部分
RabbitMQ/MassTransit 向消费者推送消息?
是的,MassTransit 将消息发布到总线,然后消费者处理它们
可以 RabbitMQ/MassTransit 以间隔 (1s) 推送消息(合并它们)以减少网络流量吗?
不知道是否有此功能,您可以自己编写,但必须非常小心丢失消息。
能否RabbitMQ/MassTransit将相同的消息推送给不同类型的消费者?
是的,多个消费者可以消费相同类型的消息。
我编写了一个简单的 hello world 应用程序来展示基础知识 - http://nodogmablog.bryanhogan.net/2015/04/mass-transit-with-rabbitmq-hello-world/
消息类型:"PublishX"
消费者:
Type1ConsumerX
Type2ConsumerX
Type3ConsumerX
所有消费者都必须立即捕获消息,但在自己内部同步消费..
例如队列中有 100 "PublishX" 条消息。 Type1ConsumerX 消费了 30 条消息(同步),Type2ConsumerX 消费了 50 条消息(同步),Type3ConsumerX 消费了 100 条消息(同步)。 我怎么知道消息被"all type of consumers"消费了?
可以RabbitMQ/MassTransit向消费者推送消息吗?
能否RabbitMQ/MassTransit以间隔 (1s) 推送消息(合并它们)以减少网络流量?
能否RabbitMQ/MassTransit将相同的消息推送给不同类型的消费者?
如果我对问题的理解正确,您只需设置一个基本的 pub/sub 模式。这将允许您将相同的消息传递给多个消费者。
示例发布商:
public static void PublishMessageToFanout()
{
var factory = new ConnectionFactory { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare("messages", "fanout");
var message = new Message { Text = "This is a message to send" };
var json = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(json);
channel.BasicPublish("messages", string.Empty, null, body);
}
}
示例消费者:
SubscribeToMessages("sms-messages", (s) => Console.WriteLine("SMS Message: {0}", s));
SubscribeToMessages("email-messages", (s) => Console.WriteLine("Email Message: {0}", s));
public static void SubscribeToMessages(string queueName, Action<string> messageAction)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare("messages", "fanout");
channel.QueueDeclare(queueName, true, false, false, null);
channel.QueueBind(queueName, "messages", string.Empty);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, true, consumer);
while (true)
{
var ea = consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
messageAction(message);
}
}
}
如果您 运行 SubscribeToMessages
在单独的进程或控制台应用程序中循环,您会发现无论何时调用 PublishMessageToFanout
,它们都会打印出消息。您还会看到这两个队列都存在于队列下的 RabbitMQ 管理中。
关于您问题中的公共交通部分
RabbitMQ/MassTransit 向消费者推送消息? 是的,MassTransit 将消息发布到总线,然后消费者处理它们
可以 RabbitMQ/MassTransit 以间隔 (1s) 推送消息(合并它们)以减少网络流量吗? 不知道是否有此功能,您可以自己编写,但必须非常小心丢失消息。
能否RabbitMQ/MassTransit将相同的消息推送给不同类型的消费者? 是的,多个消费者可以消费相同类型的消息。
我编写了一个简单的 hello world 应用程序来展示基础知识 - http://nodogmablog.bryanhogan.net/2015/04/mass-transit-with-rabbitmq-hello-world/