在公共交通中使用 _error 队列

Consuming _error queue in masstransit

对于每个队列 masstransit 都有消费者,它会自动创建一个 [queuename]_error 队列,并将无法处理的消息移到那里(重试后等)

我正在尝试创建一个消费者,它从该队列中获取错误并将其写入数据库。

为了使用这些消息,我必须为错误队列创建一个 handler/consumer,接收原始消息。

cfg.ReceiveEndpoint(host, "myqueuename", e =>
{
    e.Handler<MyMessage>(ctx => 
    {
        throw new Exception ("Not expected");
    });
});

cfg.ReceiveEndpoint(host, "myqueuename_error", e =>
{
    e.BindMessageExchanges = false;
    e.Handler<MyMessage>(ctx =>
    {
        Console.WriteLine("Handled");
        // do whatever
        return ctx.CompleteTask;
    });
});

一切正常,问题是检索实际发生的异常。 我实际上能够做到这一点,有一些严重的黑客....

e.Handler<MyMessage>(m =>
{
    var buffer = m.ReceiveContext.TransportHeaders
                    .GetAll().Single(s => s.Key == "MT-Fault-Message").Value as byte[];
    var errorText = new StreamReader(new MemoryStream(buffer)).ReadToEnd();
    Console.WriteLine($"Handled, Error={errorText}");
    return m.CompleteTask;
});

不过那是错误的。

PS:我知道我可以订阅一个 Fault 事件,但在这种特殊情况下,它是一个 RequestClient(请求-响应)模式,并且 MT 重定向 FaultAddress回到客户端,我不能保证它仍然是 运行。

Request/reply 应该只用于获取数据。这意味着如果请求者出现故障 - 没有更多理由回复数据或错误,并且您没有兴趣使用错误。

因此,请求客户端使用临时(非持久)队列而不是接收端点队列的原因是设计使然。鼓励你不要理解你的回复范围只在请求等待时间内。

如果您发送命令并需要通知命令是否已被处理 - 您应该发布事件以通知命令处理的结果。使用消息元数据(发起者 ID 和对话 ID)可以让您了解事件与命令的关联方式。

因此,仅使用request/reply来使用decoupled invocation SOA pattern请求信息(查询),其中回复仅具有与请求相关的意义,如果请求者宕机,则回复不再需要,不管是成功还是失败。