Masstransit RPC (RabbitMq) 吞吐量限制

Masstransit RPC (RabbitMq) throughput limit

我们将 Masstransit 与 RabbitMq 结合使用,使 RPC 从我们系统的一个组件到其他组件。

最近我们遇到了客户端吞吐量的限制,测得每秒大约有 80 个完成的响应。

在排查问题出在哪里的过程中,我发现RPC服务器处理请求的速度很快,然后将响应放入回调队列,然后队列处理速度为80M\s

此限制仅适用于客户端。在同一台机器上启动同一客户端应用程序的另一个进程会使服务器端的请求吞吐量加倍,但随后我看到两个充满消息的回调队列正在被消耗,每个队列都使用相同的 80 M\s

我们正在使用 IBus 的单个实例

builder.Register(c =>
{
    var busSettings = c.Resolve<RabbitSettings>();
    var busControl = MassTransitBus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(new Uri(busSettings.Host), h =>
            {
                h.Username(busSettings.Username);
                h.Password(busSettings.Password);
            });

            cfg.UseSerilog();

            cfg.Send<IProcessorContext>(x =>
            {
               x.UseCorrelationId(context => context.Scope.CommandContext.CommandId);
            });

    }
);

return busControl;
})
.As<IBusControl>()
.As<IBus>()
.SingleInstance();

发送逻辑如下所示:

var busResponse = await _bus.Request<TRequest, TResult>(
                destinationAddress: _settings.Host.GetServiceUrl<TCommand>(queueType),
                message: commandContext,
                cancellationToken: default(CancellationToken),
                timeout: TimeSpan.FromSeconds(_settings.Timeout),
                callback: p => { p.WithPriority(priority); });

有人遇到过这样的问题吗? 我猜测响应调度逻辑中存在一些程序限制。它可能是最大线程池大小,或者缓冲区的大小,也可能是响应队列的预取计数。 我尝试使用 .Net 线程池大小,但没有任何帮助。

我是 Masstransit 的新手,如果能帮我解决我的问题,我将不胜感激。 希望可以通过配置的方式修复

您可以尝试一些方法来优化性能。我还建议在您的环境中检查 MassTransit-Benchmark 和 运行 - 这会让您了解代理的可能吞吐量。它允许您调整预取计数、并发等设置,以查看它们如何影响您的结果。

此外,我建议使用其中一个请求客户端来减少每个 request/response 的设置。例如,创建一次请求客户端,然后为每个请求使用同一个客户端。

var serviceUrl = yourMethodToGetIt<TRequest>(...); var client = Bus.CreateRequestClient<TRequest>(serviceUrl);

然后,只要您需要执行请求,就使用那个 IRequestClient<TRequest> 实例。

Response<Value> response = await client.GetResponse<TResponse>(new Request());

由于您只是使用 RPC,我强烈建议将接收端点队列设置为非持久队列,以避免将 RPC 请求写入磁盘。并将总线预取计数调整为更高的值(比您可能拥有的最大并发请求数高 2 倍)以确保响应始终直接传递给等待响应的消费者(这是 RabbitMQ 传递消息的内部方式)。

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.PrefetchCount = 1000; }