Rebus 监听纯 azure servicebus
Rebus listening to pure azure servicebus
在我的场景中,我在我的应用程序中实现了 Rebus(使用 azure servicebus)(打算使用 sagas 制作 PoC)。
问题是,我正在听的主题只是 azure servicebus,没有花哨的幻想。
我正在订阅主题,将消息移至我的队列并收到此错误:
Rebus.Exceptions.RebusApplicationException: Received message with empty or absent 'rbs2-msg-id' header! All messages must be supplied with an ID . If no ID is present, the message cannot be tracked between delivery attempts, and other stuff would also be much harder to do - therefore, it is a requirement that messages be supplied with an ID.
有没有办法在消息到达队列之前对其进行修饰,将其转换为 Rebus 可以接受的内容?
或者我是否必须有一个单独的总线来处理这些消息并将它们作为符合 Rebus 的消息重新发送到 topic/queue?
该错误消息来自 SimpleRetryStrategyStep
,这通常是 Rebus 中执行的第一步 incoming messages pipeline。
您的一个选择是在接收方的 Rebus 实例中装饰 ITransport
,这将为您提供一个提供消息 ID 的地方。可以这样做:
Configure.With(...)
.Transport(t => {
t.UseAzureServiceBus(...);
t.Decorate(c => new MyTransportDecorator(t.Get<ITransport>()))
})
.Start();
其中 MyTransportDecorator
是一个装饰器,看起来像这样:
class MyTransportDecorator : ITransport
{
readonly ITransport _transport;
public MyTransportDecorator(ITransport transport) => _transport = transport;
public void CreateQueue(string address) => _transport.CreateQueue(address);
public Task Send(string destinationAddress, TransportMessage message, ITransactionContext context)
=> _transport.Send(destinationAddress, message, context);
public async Task<TransportMessage> Receive(ITransactionContext context, CancellationToken cancellationToken)
{
var message = await _transport.Receive(context, cancellationToken);
if (message == null) return null;
ProvideMessageIdSomehow(message);
return message;
}
public string Address => _transport.Address;
}
其中 ProvideMessageIdSomehow
然后添加所需的 header.
在我的场景中,我在我的应用程序中实现了 Rebus(使用 azure servicebus)(打算使用 sagas 制作 PoC)。
问题是,我正在听的主题只是 azure servicebus,没有花哨的幻想。
我正在订阅主题,将消息移至我的队列并收到此错误:
Rebus.Exceptions.RebusApplicationException: Received message with empty or absent 'rbs2-msg-id' header! All messages must be supplied with an ID . If no ID is present, the message cannot be tracked between delivery attempts, and other stuff would also be much harder to do - therefore, it is a requirement that messages be supplied with an ID.
有没有办法在消息到达队列之前对其进行修饰,将其转换为 Rebus 可以接受的内容?
或者我是否必须有一个单独的总线来处理这些消息并将它们作为符合 Rebus 的消息重新发送到 topic/queue?
该错误消息来自 SimpleRetryStrategyStep
,这通常是 Rebus 中执行的第一步 incoming messages pipeline。
您的一个选择是在接收方的 Rebus 实例中装饰 ITransport
,这将为您提供一个提供消息 ID 的地方。可以这样做:
Configure.With(...)
.Transport(t => {
t.UseAzureServiceBus(...);
t.Decorate(c => new MyTransportDecorator(t.Get<ITransport>()))
})
.Start();
其中 MyTransportDecorator
是一个装饰器,看起来像这样:
class MyTransportDecorator : ITransport
{
readonly ITransport _transport;
public MyTransportDecorator(ITransport transport) => _transport = transport;
public void CreateQueue(string address) => _transport.CreateQueue(address);
public Task Send(string destinationAddress, TransportMessage message, ITransactionContext context)
=> _transport.Send(destinationAddress, message, context);
public async Task<TransportMessage> Receive(ITransactionContext context, CancellationToken cancellationToken)
{
var message = await _transport.Receive(context, cancellationToken);
if (message == null) return null;
ProvideMessageIdSomehow(message);
return message;
}
public string Address => _transport.Address;
}
其中 ProvideMessageIdSomehow
然后添加所需的 header.