使用 Rebus 将错误消息从错误队列移回原始队列

Moving Error message from error queue back to original queue with Rebus

是否可以通过编程方式或通过 UI 将错误消息从错误队列移动到其原始队列?

更新

关于下面代码的问题:

1 以下代码是否适用于发布者或订阅者或两者?

代码如下:

 Configure.With(activator)
    .Transport(t => (...)) //< use queue "error" here
    .Routing(r =>
    {
        r.AddTransportMessageForwarder(async transportMessage =>
        {
            var sourceQueue = transportMessage.Headers.TryGetValue(Headers.SourceQueue, out var result)
                ? result
                : throw new ArgumentException($"Could not find '{Headers.SourceQueue}' header");

            return ForwardAction.ForwardTo(sourceQueue);
        });
    })
    .Start();

2 Transport 下面的方法适用于我的代码。但是,上面的代码建议使用 error 队列名称,可以吗?

如果使用上面的代码,像下面这样的发布者和订阅者队列名称在哪里指定?

请提供发布子模式的代码。

出版商:

.Transport(t => t.UseAzureServiceBus(Consts.ServiceBusConnectionString, Consts.Publisher))

订阅者:

.Transport(t=>t.UseAzureServiceBus(Consts.ServiceBusConnectionString, Consts.Subscriber1))

https://github.com/rebus-org/Rebus/wiki/Transport-message-forwarding

由于 Rebus 使用普通队列作为它的 dead-letter 队列,所以很容易以 error 作为输入队列来启动一个总线实例——然后你可以,例如使用 Rebus 的 built-in transport message forwarding 功能对消息执行您想要的操作 – 例如将它们转发到它们的源队列:

Configure.With(activator)
    .Transport(t => (...)) //< use queue "error" here
    .Routing(r =>
    {
        r.AddTransportMessageForwarder(async transportMessage =>
        {
            var sourceQueue = transportMessage.Headers.TryGetValue(Headers.SourceQueue, out var result)
                ? result
                : throw new ArgumentException($"Could not find '{Headers.SourceQueue}' header");

            return ForwardAction.ForwardTo(sourceQueue);
        });
    })
    .Start();

或者你想要的任何东西。

还有一个 UI、Fleet Manager, that can do this – it replaces the need for dead-letter queues entirely, as it stores failed messages in its database and makes it possible to return the failed messages to their source queues (or another queue, if that's what you want), but it's only available if you're a Rebus Pro 订阅者。


更新(包含更新中问题的答案):

1) AddTransportMessageForwarder 仅与接收消息的端点相关。

2) 它是 "queue name" 指定为 .Useblablabla 方法的参数。例如,对于 Azure 服务总线,它将读取

.Transport(t => t.UseAzureServiceBus(Consts.ServiceBusConnectionString, "error"))