MassTransit Round-Robin 余额不断向同一个队列发送消息,即使它很忙
MassTransit Round-Robin balance keeps sending message to the same queue even though it's busy
我正在尝试使用 RabbitMQ 为我的应用程序设置 MassTransit。
我正在使用 CastleWindsor DI 和 .Net Framework 4.7。
我在 Program.cs 中注册消费者和总线的方式是:
container.AddMassTransit(x =>
{
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(configurationProvider.RabbitHostName);
x.AddConsumer<FactAddedFirstHandler>();
x.AddConsumer<FactAddedSecondHandler>();
cfg.ReceiveEndpoint("addedFactQueue", ec =>
{
cfg.ConfigureEndpoints(container);
});
}));
});
IBusControl busControl = container.Kernel.Resolve<IBusControl>();
busControl.StartAsync();
一个处理程序的例子是:
internal class FactAddedFirstHandler : IConsumer<FactAddedEvent>
{
public async Task Consume(ConsumeContext<FactAddedEvent> context)
{
//Do stuff
}
}
让我们将此主要 运行ning 应用程序简称为 Application。
我想要实现的是两件不同的事情:
1) 每当(从另一个服务)抛出 FactAddedEvent 时,Application 就会将消息传递给两个消费者:FactAddedFirstHandler 和 FactAddedSecondHandler。
这是正确发生的,因为如果我理解正确,MassTransit 会创建一个交换并将消息传递给所有订阅该消息的消费者。
2) 如果我 运行 2 个 Application 实例(我们称它们为 Application1 和 Application2 ) 然后应用循环法。这意味着 RabbitMQ 将消息平衡到免费的 Application 实例。
这 部分 正确。这意味着如果我有两个 istances up 和 运行ning 并尝试发送一个 FactAddedEvent 然后一次事件被 Application1 接收还有一次 Application2。
此外,如果我将 Application2 关闭,则所有消息都将由 Application1 接收。到目前为止,一切正常。
问题: 当我自愿阻止时出现的问题,例如 Application1。假设我发送了 FactAddedEvent,它被 Application1 接收,但是在处理程序的使用方法中我设置了一个断点并在那里等待。所以我在这里自愿屏蔽Application1。现在我期望的是,由于 Application1 被阻止,RabbitMQ 将仅将我抛出的所有下一个事件传递给 Application2。相反,它只是做的是:一旦 RabbitMQ 将它发送到 Application1 一旦它发送到 Application2,无论如何。因此,在 Application1 中,即使 Application2 可以自由工作,也会有一排消息堆积起来。
我做错了什么?提前致谢
RabbitMQ 通道有一个 PrefetchCount,它是代理在暂停以等待这些消息被确认之前发送给消费者的未确认消息的数量。默认情况下,MassTransit 将其设置为 16,或 CPU 核心数 x 2,以较大者为准。此设置适用于每个接收端点。
当 运行 多个实例时,代理将根据其算法决定 channel/consumer 接收消息。规则是通过可用的预取直接将消息传递给消费者,以避免初始写入磁盘。一旦达到所有消费者预取限制,消息就会被缓冲。
即使在断点,服务器认为你还活着,所以它会继续发送消息,直到你填满。
根据您使用的配置,您可以create a ConsumerDefinition
for your consumer to change that prefetch value for the receive endpoint. You can also just configure the consumer inline, using the .Endpoint()
语法:
x.AddConsumer<FactAddedFirstHandler>()
.Endpoint(x => x.PrefetchCount = 1);
这个数字真的很低,我不建议在生产中使用这个值,但对于您的测试来说没问题。
希望这能为您提供一些工具,让事情按您期望的方式进行。
我正在尝试使用 RabbitMQ 为我的应用程序设置 MassTransit。
我正在使用 CastleWindsor DI 和 .Net Framework 4.7。
我在 Program.cs 中注册消费者和总线的方式是:
container.AddMassTransit(x =>
{
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(configurationProvider.RabbitHostName);
x.AddConsumer<FactAddedFirstHandler>();
x.AddConsumer<FactAddedSecondHandler>();
cfg.ReceiveEndpoint("addedFactQueue", ec =>
{
cfg.ConfigureEndpoints(container);
});
}));
});
IBusControl busControl = container.Kernel.Resolve<IBusControl>();
busControl.StartAsync();
一个处理程序的例子是:
internal class FactAddedFirstHandler : IConsumer<FactAddedEvent>
{
public async Task Consume(ConsumeContext<FactAddedEvent> context)
{
//Do stuff
}
}
让我们将此主要 运行ning 应用程序简称为 Application。 我想要实现的是两件不同的事情:
1) 每当(从另一个服务)抛出 FactAddedEvent 时,Application 就会将消息传递给两个消费者:FactAddedFirstHandler 和 FactAddedSecondHandler。
这是正确发生的,因为如果我理解正确,MassTransit 会创建一个交换并将消息传递给所有订阅该消息的消费者。
2) 如果我 运行 2 个 Application 实例(我们称它们为 Application1 和 Application2 ) 然后应用循环法。这意味着 RabbitMQ 将消息平衡到免费的 Application 实例。
这 部分 正确。这意味着如果我有两个 istances up 和 运行ning 并尝试发送一个 FactAddedEvent 然后一次事件被 Application1 接收还有一次 Application2。 此外,如果我将 Application2 关闭,则所有消息都将由 Application1 接收。到目前为止,一切正常。
问题: 当我自愿阻止时出现的问题,例如 Application1。假设我发送了 FactAddedEvent,它被 Application1 接收,但是在处理程序的使用方法中我设置了一个断点并在那里等待。所以我在这里自愿屏蔽Application1。现在我期望的是,由于 Application1 被阻止,RabbitMQ 将仅将我抛出的所有下一个事件传递给 Application2。相反,它只是做的是:一旦 RabbitMQ 将它发送到 Application1 一旦它发送到 Application2,无论如何。因此,在 Application1 中,即使 Application2 可以自由工作,也会有一排消息堆积起来。
我做错了什么?提前致谢
RabbitMQ 通道有一个 PrefetchCount,它是代理在暂停以等待这些消息被确认之前发送给消费者的未确认消息的数量。默认情况下,MassTransit 将其设置为 16,或 CPU 核心数 x 2,以较大者为准。此设置适用于每个接收端点。
当 运行 多个实例时,代理将根据其算法决定 channel/consumer 接收消息。规则是通过可用的预取直接将消息传递给消费者,以避免初始写入磁盘。一旦达到所有消费者预取限制,消息就会被缓冲。
即使在断点,服务器认为你还活着,所以它会继续发送消息,直到你填满。
根据您使用的配置,您可以create a ConsumerDefinition
for your consumer to change that prefetch value for the receive endpoint. You can also just configure the consumer inline, using the .Endpoint()
语法:
x.AddConsumer<FactAddedFirstHandler>()
.Endpoint(x => x.PrefetchCount = 1);
这个数字真的很低,我不建议在生产中使用这个值,但对于您的测试来说没问题。
希望这能为您提供一些工具,让事情按您期望的方式进行。