MessageBus:等待处理完成并向请求者发送ACK

MessageBus: wait when processing is done and send ACK to requestor

我们使用外部 TCP/IP 接口,其中一项要求是保持连接打开,等待处理完成并发送 ACK 和结果。

假设我们要使用 MessageBus (masstransit/nservicebus) 与处理模块通信并跟踪消息状态:已接收、正在处理、成功、失败,实现该目标的最佳方法是什么?

具体来说,当消息到达 handler/consumer 时,它如何知道 TCP/IP 连接?我应该将它存储在一些自定义容器中并将其注入消费者吗?

感谢任何指导。谢谢。

消费者将知道如何启动和管理 TCP 连接生命周期。

收到消息后,处理程序可以调用代码,根据消息数据执行某些操作。无论这涉及在某处屏幕上显示绿色大象还是打开端口、进行调用然后处理 ACK,都不会改变您处理消息的方式。

负责执行操作的实际代码可以打包成类似 nuget 包的东西,并通过某种通用接口公开,如果这会让您更开心的话,但这与具有双重角色的组件并不矛盾作为该消息的消息消费者和处理器。

A new instance of the consumer will be created for each message receiving. Also, in my case, consumer can’t initiate TCP/IP connection, it has been already opened earlier (and stored somewhere else) and consumer needs just have access to use it.

抱歉,我应该更仔细地阅读你原来的问题。

有一个从 NServiceBus 共享访问资源的解决方案,如文档所述 here

public class SomeEventHandler : IHandleMessages<SomeEvent>
{
    private IMakeTcpCall _caller;

    public SomeEventHandler(IMakeTcpCalls caller)
    {
        _caller = caller;
    }

    public Task Handle(SomeEvent message, IMessageHandlerContext context)
    {
        // Use the caller 
        var ack = _caller.Call(message.SomeData);

        // Do something with ack
        ...

        return Task.CompletedTask;
    }
}

你最好有一个 DI 容器来管理 IMakeTcpCall 实例的生命周期作为一个单例(虽然这在大容量场景中可能会很奇怪),这样你就可以重新使用打开的 TCP 通道。

例如,在Castle Windsor中:

Component.For<IMakeTcpCalls>().ImplementedBy<MyThreadsafeTcpCaller>().LifestyleSingleton();

温莎城堡 integrates 与 NServiceBus