RabbitMQ 消费者中的并发消息处理

Concurrent message processing in RabbitMQ consumer

我是 RabbitMQ 的新手,所以如果我的问题听起来微不足道,请原谅。我想在 RabbitMQ 上发布消息,该消息将由 RabbitMQ 消费者处理。

我的消费者机器是多核机器(最好是 Azure 上的工作者角色)。但是 QueueBasicConsumer 一次推送一条消息。我如何编程以利用所有核心,我可以同时处理多条消息。

  1. 一种解决方案可能是在多个线程中打开多个通道,然后在那里处理消息。但在这种情况下,我将如何决定线程数。
  2. 另一种方法是在主线程上读取消息,然后创建任务并将消息传递给该任务。在这种情况下,如果有许多消息(在阈值之后)已经在进行中,我将不得不停止使用消息。不确定如何实施。

提前致谢

您的第二个选项听起来更合理 - 在单个通道上使用并产生多个任务来处理消息。要实现并发控制,您可以使用 semaphore 来控制运行中的任务数。在开始任务之前,您将等待信号量可用,并且在任务完成后,它会向信号量发出信号以允许其他任务 运行.

您没有指定您 language/technology 选择的堆栈,但无论您做什么 - 尝试使用线程池而不是自己创建和管理线程。在 .NET 中,这意味着使用 Task.Run 异步处理消息。

示例 C# 代码:

using (var semaphore = new SemaphoreSlim(MaxMessages))
{
    while (true)
    {
        var args = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
        semaphore.Wait();
        Task.Run(() => ProcessMessage(args))
            .ContinueWith(() => semaphore.Release());
    }
}

与其自己控制并发级别,您可能会发现在通道上启用显式 ACK 控制并使用 RabbitMQ Consumer Prefetch 设置未确认消息的最大数量会更容易。这样,您将永远不会一次收到多于您想要的消息。