通过 MassTransit Request/Response 对话扩展订阅者

Scaling Out Subscribers with MassTransit Request/Response conversation

我有一个使用 Masstransit / RabbitMq 实现的企业服务总线,用于 Web 项目。我使用 Request/Response Masstransit 模式在 MQ 上执行 RPC。

我正在处理不同类型消息的总线中创建多个 ReceiveEndpoint。 ESB 也使用自定义配置文件来创建多个总线,因此我可以以某种方式在逻辑上实现 Qos,甚至可以通过网络在单独的服务器上分发消费者,或多或少地提高性能。

最近我看到我的一位消费者被屏蔽了。如果我向一个长 运行 消费者发送一些消息(每条消息大约需要 5 秒的工作时间),它看起来像一个线程响应我的请求并同时发送消息等待之前的消息被消费。

到目前为止,我设置了 UseConcurrencyLimit(64),但没有任何改变,试图将 PrefetchCount 增加到 50,但是 RabbitMq 在队列详细信息中显示它为 0。

为什么消费者一次只处理一条消息?

地铁 v3.5.7

编辑:我后来找到了 this thread。这对我来说似乎是同样的问题。

UPDATE 与 Chris 的示例不同的是,我使用 RabbitMq 并使用反射来创建使用者,这样我就可以使用配置文件管理 ESB。仍在 RabbitMq 管理控制台上看到 Prefetch Count 0。

var busControl = Bus.Factory.CreateUsingRabbitMq(x =>
        {
            x.UseConcurrencyLimit(64);
            IRabbitMqHost host = x.Host(new Uri(Config.host), h =>
            {
                h.Username("userName");
                h.Password("password");
            });

            var obj = Activator.CreateInstance([SomeExistingType]);

            x.ReceiveEndpoint(host, "queueName", e =>
            {
                e.PrefetchCount = 64;//config.prefetchCount;
                e.PurgeOnStartup = true;
                if (config.retryCount > 0)
                {
                    e.UseRetry(retry => retry.Interval(config.retryCount, config.retryInterval));
                }
                e.SetQueueArgument("x-expires", config.timeout * 1000 /*Seconds to milliseconds*/);
                e.SetQueueArgument("temporary", config.temporary);

                e.Consumer(consumer, f => obj);

            }); 

        })

busControl.StartAsync();

更新 2 当我将预取计数设置为 1 以让 MassTransit 处理工作负载时,因为我有多个消费者服务器和一个 RabbitMq 集群。但是,当我将许多消息发送到使所有线程都处于忙碌状态的队列时,新请求会在发送者队列中等待,以供空闲线程接收。我增加了预取计数,这样我就可以为新请求获得更多空闲线程。我按照 Chris 的建议在配置接收端点时设置了预取计数。谢谢。 Chris 确认后,我会立即将 Chris 的回复标记为答案。

如果您在 RabbitMQ 控制台中看到 0 预取计数,您可能在错误的位置配置它。你应该有一些类似的东西:

cfg.ReceiveEndpoint("my-queue", x =>
{
    x.PrefetchCount = 64;
    x.Consumer<MyConsumer>();
});

这会将接收端点消费者配置为预取计数 64,这应该会显示在 RabbitMQ 管理控制台上的消费者中。

另一方面,如果您的使用者使用 Thread.Sleep() 之类的方法阻塞线程,我见过这样的情况会限制线程池的并发性。