为一个消费者使用 EasyNetQ 多个处理程序不起作用
using EasyNetQ multiple Handler for one consumer does not work
我们正在使用 RabbitMQ 在 C# .Net(EasyNetQ 客户端)中对消息进行排队。
我想要一个消费者应用程序(C# 控制台应用程序)侦听一个队列并为每种消息类型提供多个处理程序。
我实现了这个场景,我的代码在这里:
using (var advancedBus = RabbitHutch.CreateBus("host=localhost;prefetchcount=100")
.Advanced)
{
var queue = advancedBus.QueueDeclare("MyQueue");
advancedBus.Consume(queue, x => x
.Add<MessageType1>((message, info) =>
{
Console.WriteLine("MessageType1 Body : " + message.Body.Body);
})
.Add<MessageType2>((message, info) =>
{
Console.WriteLine(" MessageType2 Body: " + message.Body.Body);
}).ThrowOnNoMatchingHandler = false);
}
我的问题:
但是当我执行这个消费者时,它什么也不做。不要发生任何事情。
我像这样将消息发布到该队列:
using (var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced)
{
var queue = advancedBus.QueueDeclare("MyQueue");
if (advancedBus.IsConnected)
advancedBus.Publish(Exchange.GetDefault(), queue.Name, false, false,
new Message<MessageType1>(change));
else
result = false;
}
这是什么问题。
好的,测试这段代码后,问题如下:
首先,您要在注册消费后立即处理 advancedBus
。您需要记住,当您调用 IAdvanceBus.Consume
、 时,您只是为每条消息注册一个回调。如果您在注册后立即处理总线,则无法调用您的委托,因为连接已经关闭。因此,您将删除 rabbit 声明周围的 using
语句(完成后不要忘记处理它):
var advancedBus = RabbitHutch.CreateBus("host=localhost;prefetchcount=100").Advanced
其次,immediate
标志 has been deprecated and shouldn't be used,消息似乎没有进入队列。将 Publish
更改为:
advancedBus.Publish(Exchange.GetDefault(), queue.Name, true, false,
new Message<MessageType1>(change));
此外,如果您是 运行 来自控制台应用程序的,请不要忘记使用 Console.ReadKey
这样您的主线程就不会终止。
这是一个工作代码示例:
static void Main()
{
var change = new MessageType1();
var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced;
ConsumeMessage(advancedBus);
var queue = advancedBus.QueueDeclare("MyQueue");
if (advancedBus.IsConnected)
{
advancedBus.Publish(Exchange.GetDefault(), queue.Name, true, false,
new Message<MessageType1>(change));
}
else
{
Console.WriteLine("Can't connect");
}
Console.ReadKey();
}
private static void ConsumeMessage(IAdvancedBus advancedBus)
{
var queue = advancedBus.QueueDeclare("MyQueue");
advancedBus.Consume(queue, registration =>
{
registration.Add<MessageType1>((message, info) =>
{
Console.WriteLine("Body: {0}", message.Body);
});
});
}
我们正在使用 RabbitMQ 在 C# .Net(EasyNetQ 客户端)中对消息进行排队。
我想要一个消费者应用程序(C# 控制台应用程序)侦听一个队列并为每种消息类型提供多个处理程序。
我实现了这个场景,我的代码在这里:
using (var advancedBus = RabbitHutch.CreateBus("host=localhost;prefetchcount=100")
.Advanced)
{
var queue = advancedBus.QueueDeclare("MyQueue");
advancedBus.Consume(queue, x => x
.Add<MessageType1>((message, info) =>
{
Console.WriteLine("MessageType1 Body : " + message.Body.Body);
})
.Add<MessageType2>((message, info) =>
{
Console.WriteLine(" MessageType2 Body: " + message.Body.Body);
}).ThrowOnNoMatchingHandler = false);
}
我的问题: 但是当我执行这个消费者时,它什么也不做。不要发生任何事情。
我像这样将消息发布到该队列:
using (var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced)
{
var queue = advancedBus.QueueDeclare("MyQueue");
if (advancedBus.IsConnected)
advancedBus.Publish(Exchange.GetDefault(), queue.Name, false, false,
new Message<MessageType1>(change));
else
result = false;
}
这是什么问题。
好的,测试这段代码后,问题如下:
首先,您要在注册消费后立即处理 advancedBus
。您需要记住,当您调用 IAdvanceBus.Consume
、 时,您只是为每条消息注册一个回调。如果您在注册后立即处理总线,则无法调用您的委托,因为连接已经关闭。因此,您将删除 rabbit 声明周围的 using
语句(完成后不要忘记处理它):
var advancedBus = RabbitHutch.CreateBus("host=localhost;prefetchcount=100").Advanced
其次,immediate
标志 has been deprecated and shouldn't be used,消息似乎没有进入队列。将 Publish
更改为:
advancedBus.Publish(Exchange.GetDefault(), queue.Name, true, false,
new Message<MessageType1>(change));
此外,如果您是 运行 来自控制台应用程序的,请不要忘记使用 Console.ReadKey
这样您的主线程就不会终止。
这是一个工作代码示例:
static void Main()
{
var change = new MessageType1();
var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced;
ConsumeMessage(advancedBus);
var queue = advancedBus.QueueDeclare("MyQueue");
if (advancedBus.IsConnected)
{
advancedBus.Publish(Exchange.GetDefault(), queue.Name, true, false,
new Message<MessageType1>(change));
}
else
{
Console.WriteLine("Can't connect");
}
Console.ReadKey();
}
private static void ConsumeMessage(IAdvancedBus advancedBus)
{
var queue = advancedBus.QueueDeclare("MyQueue");
advancedBus.Consume(queue, registration =>
{
registration.Add<MessageType1>((message, info) =>
{
Console.WriteLine("Body: {0}", message.Body);
});
});
}