MassTransit 响应进入同一队列并导致循环

MassTransit Respond going to same queue and causing loop

我们目前正在将 NServiceBus 换成 MassTransit,我在使用 request/response 模式时遇到了一些困难。

NServiceBus 中,我可以在 Handler 中进行回复,然后返回给发送它的客户端。

MassTransit 中,似乎响应被发送回它被消费的队列,从而创建了一个循环...

奇怪的是,如果我使用 InMemory 创建总线,并且客户端和消费者都在同一台机器上,我就不会遇到这个问题。

我希望我的客户端能够收到响应,但我的 Consumer 却收到了它,这也很奇怪,因为它没有设置为接收该消息类型...

我是否在客户端的请求设置中遗漏了什么?

客户:

....
IRequestClient<IWorklistRequest, IWorklistResponse> client = CreateRequestClient(busControl, WorklistEndpointUri);

Console.Write("Sending Request");

Task.Run(async () =>
{
    IWorklistRequest request = new WorklistRequest
    {
        CurrentDateFrom = new DateTime(2016, 11, 07)
    };

    var response = await client.Request(request);

    Console.WriteLine("Worklist Items retrieved: {0}", response.ExamItemList.Length);

}).Wait();
....



static IRequestClient<IWorklistRequest, IWorklistResponse> CreateRequestClient(IBusControl busControl, string endpointAddress)
{
    Console.WriteLine("Creating Request client...");

    var serviceAddress = new Uri(endpointAddress);
    IRequestClient<IWorklistRequest, IWorklistResponse> client =
    busControl.CreateRequestClient<IWorklistRequest, IWorklistResponse>(serviceAddress, TimeSpan.FromSeconds(10));

    return client;
}

消费者:

    public Task Consume(ConsumeContext<IWorklistRequest> context)
    {
        _log.InfoFormat("Received Worklist Request with Id: {0}", context.RequestId);


        try
        {
            var result = _provider.GetAllWorklistsByStartDate(context.Message.CurrentDateFrom);

            IWorklistResponse response = new WorklistResponse
            {
                ExamItemList = result.ToArray()
            };

            // the below is sending the response right back to the original queue and is getting picked up again by this same consumer
            context.Respond(response);
        }
        catch (Exception ex)
        {
            _log.Info(ex.Message);
        }

        return Task.FromResult(0);
    }

如果您使用的是 RabbitMQ,并且您使用的是请求客户端,则不应看到此行为。

有一个示例演示了如何在 MassTransit GitHub 存储库上使用请求客户端:https://github.com/MassTransit/Sample-RequestResponse

上面的代码似乎是正确的,Respond() 调用应该使用请求消息中的响应地址,这是一个直接发送到临时总线地址的端点。

这个领域有相当广泛的单元测试覆盖范围,上面的示例已更新并使用最新版本的 MassTransit 进行了验证。您可能会考虑 deleting/recreating 您的 RabbitMQ 虚拟主机和 运行 您的应用程序(首先启动响应服务,以便设置端点)。