如何使用 EasyNetQ 同步使用来自 RabbitMQ 的原始字节消息?
How to synchronously consume raw byte messages from RabbitMQ using EasyNetQ?
有什么方法可以使用 EasyNetQ 同步使用来自 RabbitMQ 的原始字节消息吗?
我需要保证按顺序处理和确认来自不以 EasyNetQ 格式发布的系统的消息。我知道消费者在单线程上运行,但是 IAdvancedBus
接口只提供一种方法来消费原始消息:
IDisposable Consume(IQueue queue, Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);
Task
return 类型意味着消费者是 运行 异步回调,因此可能会乱序处理消息。
如果没有,是否有更改代码以支持此功能的想法?我会制作接口方法:
IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage);
并在 RabbitAdvancedBus
中实现它,但我不确定代码的确切位置。
这是一个有趣的问题。我自己不是 EasyNetQ 专家,也许其他人会出现并为您提供更好的答案。 但是我已经熟悉 EasyNetQ code base 大约一年了,在我看来,很难了解连接消费者时发生的事情(因此当消费者被调用时)。
首先要指出的是,仅仅通过改变方法的签名,并不能保证消息被按顺序处理。在您建议的接口的这个实现中查找示例:
IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
Func<byte[], MessageProperties, MessageReceivedInfo, Task> taskWrapper = (bytes, properties, info) =>
{
onMessage(bytes, properties, info);
return new Task(() => { });
};
Consume(queue, taskWrapper);
}
它调用了原始的Consume
方法,我们真的不知道之后会发生什么,对吗?
如果我站在你的立场上,我会做以下事情之一:
- 使用 Official RabbitMq Client 并使用那里的消息(这并不难!)
- 也许看看 RawRabbit, a thin layer above RabbitMq that I've been contributing to (using vNext standards). It only supports async signatures for consuming messages but it shouldn't be to difficult to write a synchronious implementation of
Subscriber.cs
(using a sync library like AsyncEx)。
- 更改业务逻辑的建模。我不确定这是否适用于您的情况,但一般来说,如果以正确的顺序处理每条消息是关键任务,您应该以某种方式对其进行建模,以便使用方法可以验证这条消息是下一条排队。 (此外,我认为 EasyNetQ 不保证消息序列,因此您可能希望为框架的每个新版本验证它)。
希望对您有所帮助!
我收到了在 EasyNetQ Google 组中有效的回复:
要同步执行,您可以这样做:
bus.Advanced.Consume(queue, (bytes, properties, info) =>
{
// do your synchronous work.....
return Task.CompletedTask;
});
或添加扩展名:
using System;
using System.Threading.Tasks;
using EasyNetQ;
using EasyNetQ.Consumer;
using EasyNetQ.Loggers;
using EasyNetQ.Topology;
namespace ConsoleApplication4
{
public static class RabbitAdvancedBusConsumeExtension
{
public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)));
}
public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage, Action<IConsumerConfiguration> configure)
{
return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)), configure);
}
private static Task ExecuteSynchronously(Action action)
{
var tcs = new TaskCompletionSource<object>();
try
{
action();
tcs.SetResult(null);
}
catch (Exception e)
{
tcs.SetException(e);
}
return tcs.Task;
}
}
class Program
{
static void Main(string[] args)
{
var bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IEasyNetQLogger>(s => new ConsoleLogger()));
var queue = bus.Advanced.QueueDeclare();
bus.Advanced.Consume(queue, (bytes, properties, info) =>
{
// .....
});
}
}
}
更新: 此功能是在版本 0.52.0.410 中添加的:
有什么方法可以使用 EasyNetQ 同步使用来自 RabbitMQ 的原始字节消息吗?
我需要保证按顺序处理和确认来自不以 EasyNetQ 格式发布的系统的消息。我知道消费者在单线程上运行,但是 IAdvancedBus
接口只提供一种方法来消费原始消息:
IDisposable Consume(IQueue queue, Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);
Task
return 类型意味着消费者是 运行 异步回调,因此可能会乱序处理消息。
如果没有,是否有更改代码以支持此功能的想法?我会制作接口方法:
IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage);
并在 RabbitAdvancedBus
中实现它,但我不确定代码的确切位置。
这是一个有趣的问题。我自己不是 EasyNetQ 专家,也许其他人会出现并为您提供更好的答案。 但是我已经熟悉 EasyNetQ code base 大约一年了,在我看来,很难了解连接消费者时发生的事情(因此当消费者被调用时)。
首先要指出的是,仅仅通过改变方法的签名,并不能保证消息被按顺序处理。在您建议的接口的这个实现中查找示例:
IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
Func<byte[], MessageProperties, MessageReceivedInfo, Task> taskWrapper = (bytes, properties, info) =>
{
onMessage(bytes, properties, info);
return new Task(() => { });
};
Consume(queue, taskWrapper);
}
它调用了原始的Consume
方法,我们真的不知道之后会发生什么,对吗?
如果我站在你的立场上,我会做以下事情之一:
- 使用 Official RabbitMq Client 并使用那里的消息(这并不难!)
- 也许看看 RawRabbit, a thin layer above RabbitMq that I've been contributing to (using vNext standards). It only supports async signatures for consuming messages but it shouldn't be to difficult to write a synchronious implementation of
Subscriber.cs
(using a sync library like AsyncEx)。 - 更改业务逻辑的建模。我不确定这是否适用于您的情况,但一般来说,如果以正确的顺序处理每条消息是关键任务,您应该以某种方式对其进行建模,以便使用方法可以验证这条消息是下一条排队。 (此外,我认为 EasyNetQ 不保证消息序列,因此您可能希望为框架的每个新版本验证它)。
希望对您有所帮助!
我收到了在 EasyNetQ Google 组中有效的回复:
要同步执行,您可以这样做:
bus.Advanced.Consume(queue, (bytes, properties, info) =>
{
// do your synchronous work.....
return Task.CompletedTask;
});
或添加扩展名:
using System;
using System.Threading.Tasks;
using EasyNetQ;
using EasyNetQ.Consumer;
using EasyNetQ.Loggers;
using EasyNetQ.Topology;
namespace ConsoleApplication4
{
public static class RabbitAdvancedBusConsumeExtension
{
public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)));
}
public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage, Action<IConsumerConfiguration> configure)
{
return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)), configure);
}
private static Task ExecuteSynchronously(Action action)
{
var tcs = new TaskCompletionSource<object>();
try
{
action();
tcs.SetResult(null);
}
catch (Exception e)
{
tcs.SetException(e);
}
return tcs.Task;
}
}
class Program
{
static void Main(string[] args)
{
var bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IEasyNetQLogger>(s => new ConsoleLogger()));
var queue = bus.Advanced.QueueDeclare();
bus.Advanced.Consume(queue, (bytes, properties, info) =>
{
// .....
});
}
}
}
更新: 此功能是在版本 0.52.0.410 中添加的: