MassTransit 响应进入同一队列并导致循环
MassTransit Respond going to same queue and causing loop
我们目前正在将 NServiceBus
换成 MassTransit
,我在使用 request/response 模式时遇到了一些困难。
在 NServiceBus
中,我可以在 Handler
中进行回复,然后返回给发送它的客户端。
在 MassTransit
中,似乎响应被发送回它被消费的队列,从而创建了一个循环...
奇怪的是,如果我使用 InMemory 创建总线,并且客户端和消费者都在同一台机器上,我就不会遇到这个问题。
我希望我的客户端能够收到响应,但我的 Consumer
却收到了它,这也很奇怪,因为它没有设置为接收该消息类型...
我是否在客户端的请求设置中遗漏了什么?
客户:
....
IRequestClient<IWorklistRequest, IWorklistResponse> client = CreateRequestClient(busControl, WorklistEndpointUri);
Console.Write("Sending Request");
Task.Run(async () =>
{
IWorklistRequest request = new WorklistRequest
{
CurrentDateFrom = new DateTime(2016, 11, 07)
};
var response = await client.Request(request);
Console.WriteLine("Worklist Items retrieved: {0}", response.ExamItemList.Length);
}).Wait();
....
static IRequestClient<IWorklistRequest, IWorklistResponse> CreateRequestClient(IBusControl busControl, string endpointAddress)
{
Console.WriteLine("Creating Request client...");
var serviceAddress = new Uri(endpointAddress);
IRequestClient<IWorklistRequest, IWorklistResponse> client =
busControl.CreateRequestClient<IWorklistRequest, IWorklistResponse>(serviceAddress, TimeSpan.FromSeconds(10));
return client;
}
消费者:
public Task Consume(ConsumeContext<IWorklistRequest> context)
{
_log.InfoFormat("Received Worklist Request with Id: {0}", context.RequestId);
try
{
var result = _provider.GetAllWorklistsByStartDate(context.Message.CurrentDateFrom);
IWorklistResponse response = new WorklistResponse
{
ExamItemList = result.ToArray()
};
// the below is sending the response right back to the original queue and is getting picked up again by this same consumer
context.Respond(response);
}
catch (Exception ex)
{
_log.Info(ex.Message);
}
return Task.FromResult(0);
}
如果您使用的是 RabbitMQ,并且您使用的是请求客户端,则不应看到此行为。
有一个示例演示了如何在 MassTransit GitHub 存储库上使用请求客户端:https://github.com/MassTransit/Sample-RequestResponse
上面的代码似乎是正确的,Respond()
调用应该使用请求消息中的响应地址,这是一个直接发送到临时总线地址的端点。
这个领域有相当广泛的单元测试覆盖范围,上面的示例已更新并使用最新版本的 MassTransit 进行了验证。您可能会考虑 deleting/recreating 您的 RabbitMQ 虚拟主机和 运行 您的应用程序(首先启动响应服务,以便设置端点)。
我们目前正在将 NServiceBus
换成 MassTransit
,我在使用 request/response 模式时遇到了一些困难。
在 NServiceBus
中,我可以在 Handler
中进行回复,然后返回给发送它的客户端。
在 MassTransit
中,似乎响应被发送回它被消费的队列,从而创建了一个循环...
奇怪的是,如果我使用 InMemory 创建总线,并且客户端和消费者都在同一台机器上,我就不会遇到这个问题。
我希望我的客户端能够收到响应,但我的 Consumer
却收到了它,这也很奇怪,因为它没有设置为接收该消息类型...
我是否在客户端的请求设置中遗漏了什么?
客户:
....
IRequestClient<IWorklistRequest, IWorklistResponse> client = CreateRequestClient(busControl, WorklistEndpointUri);
Console.Write("Sending Request");
Task.Run(async () =>
{
IWorklistRequest request = new WorklistRequest
{
CurrentDateFrom = new DateTime(2016, 11, 07)
};
var response = await client.Request(request);
Console.WriteLine("Worklist Items retrieved: {0}", response.ExamItemList.Length);
}).Wait();
....
static IRequestClient<IWorklistRequest, IWorklistResponse> CreateRequestClient(IBusControl busControl, string endpointAddress)
{
Console.WriteLine("Creating Request client...");
var serviceAddress = new Uri(endpointAddress);
IRequestClient<IWorklistRequest, IWorklistResponse> client =
busControl.CreateRequestClient<IWorklistRequest, IWorklistResponse>(serviceAddress, TimeSpan.FromSeconds(10));
return client;
}
消费者:
public Task Consume(ConsumeContext<IWorklistRequest> context)
{
_log.InfoFormat("Received Worklist Request with Id: {0}", context.RequestId);
try
{
var result = _provider.GetAllWorklistsByStartDate(context.Message.CurrentDateFrom);
IWorklistResponse response = new WorklistResponse
{
ExamItemList = result.ToArray()
};
// the below is sending the response right back to the original queue and is getting picked up again by this same consumer
context.Respond(response);
}
catch (Exception ex)
{
_log.Info(ex.Message);
}
return Task.FromResult(0);
}
如果您使用的是 RabbitMQ,并且您使用的是请求客户端,则不应看到此行为。
有一个示例演示了如何在 MassTransit GitHub 存储库上使用请求客户端:https://github.com/MassTransit/Sample-RequestResponse
上面的代码似乎是正确的,Respond()
调用应该使用请求消息中的响应地址,这是一个直接发送到临时总线地址的端点。
这个领域有相当广泛的单元测试覆盖范围,上面的示例已更新并使用最新版本的 MassTransit 进行了验证。您可能会考虑 deleting/recreating 您的 RabbitMQ 虚拟主机和 运行 您的应用程序(首先启动响应服务,以便设置端点)。