如何使用 EasyNetQ 同步使用来自 RabbitMQ 的原始字节消息?

How to synchronously consume raw byte messages from RabbitMQ using EasyNetQ?

有什么方法可以使用 EasyNetQ 同步使用来自 RabbitMQ 的原始字节消息吗?

我需要保证按顺序处理和确认来自不以 EasyNetQ 格式发布的系统的消息。我知道消费者在单线程上运行,但是 IAdvancedBus 接口只提供一种方法来消费原始消息:

IDisposable Consume(IQueue queue, Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);

Task return 类型意味着消费者是 运行 异步回调,因此可能会乱序处理消息。

如果没有,是否有更改代码以支持此功能的想法?我会制作接口方法:

IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage);

并在 RabbitAdvancedBus 中实现它,但我不确定代码的确切位置。

这是一个有趣的问题。我自己不是 EasyNetQ 专家,也许其他人会出现并为您提供更好的答案。 但是我已经熟悉 EasyNetQ code base 大约一年了,在我看来,很难了解连接消费者时发生的事情(因此当消费者被调用时)。

首先要指出的是,仅仅通过改变方法的签名,并不能保证消息被按顺序处理。在您建议的接口的这个实现中查找示例:

IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
    Func<byte[], MessageProperties, MessageReceivedInfo, Task> taskWrapper = (bytes, properties, info) =>
    {
        onMessage(bytes, properties, info);
        return new Task(() => { });
    };
    Consume(queue, taskWrapper);
}

它调用了原始的Consume方法,我们真的不知道之后会发生什么,对吗?

如果我站在你的立场上,我会做以下事情之一:

  1. 使用 Official RabbitMq Client 并使用那里的消息(这并不难!)
  2. 也许看看 RawRabbit, a thin layer above RabbitMq that I've been contributing to (using vNext standards). It only supports async signatures for consuming messages but it shouldn't be to difficult to write a synchronious implementation of Subscriber.cs (using a sync library like AsyncEx)。
  3. 更改业务逻辑的建模。我不确定这是否适用于您的情况,但一般来说,如果以正确的顺序处理每条消息是关键任务,您应该以某种方式对其进行建模,以便使用方法可以验证这条消息是下一条排队。 (此外,我认为 EasyNetQ 不保证消息序列,因此您可能希望为框架的每个新版本验证它)。

希望对您有所帮助!

我收到了在 EasyNetQ Google 组中有效的回复:

要同步执行,您可以这样做:

bus.Advanced.Consume(queue, (bytes, properties, info) =>
{
    // do your synchronous work.....
    return Task.CompletedTask;
});

或添加扩展名:

using System;
using System.Threading.Tasks;
using EasyNetQ;
using EasyNetQ.Consumer;
using EasyNetQ.Loggers;
using EasyNetQ.Topology;

namespace ConsoleApplication4
{
    public static class RabbitAdvancedBusConsumeExtension
    {
       public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
    {
        return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)));
    }

    public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage, Action<IConsumerConfiguration> configure)
    {
        return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)), configure);
    }

    private static Task ExecuteSynchronously(Action action)
    {
        var tcs = new TaskCompletionSource<object>();
        try
        {
            action();
            tcs.SetResult(null);
        }
        catch (Exception e)
        {
            tcs.SetException(e);
        }
        return tcs.Task;
    }
}

class Program
{
    static void Main(string[] args)
    {
        var bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IEasyNetQLogger>(s => new ConsoleLogger()));

        var queue = bus.Advanced.QueueDeclare();
        bus.Advanced.Consume(queue, (bytes, properties, info) =>
        {
            // .....
        });
    }
}
}

更新: 此功能是在版本 0.52.0.410 中添加的:

https://github.com/EasyNetQ/EasyNetQ/pull/505