MassTransit 在同步上下文中发布消息的速度如此之慢
MassTransit publish messages so slow in synchronous context
我有一种情况,我应该通过 RabbitMQ 同步发布消息(遗留代码),否则它们会乱序,因为 MassTransit 在不同的线程中发布
public void PostUserQuantitySync(int userId, decimal amount)
{
foreach (var item in Enumerable.Range(0, 1000))
{
var _ = _publishEndpoint.Publish(new CreateUserTransactionRequest() { Amount = item });
}
return Ok();
}
所以我使用了 TaskUtil.Await and/or Wait() 但是发布性能很差(每秒 33/s 消息)而纯兔子客户端有更好的结果(200/s 消息至少每秒)并尊重消息排序:
public void PostUserQuantitySync(int userId, decimal amount)
{
foreach (var item in Enumerable.Range(0, 1000))
{
TaskUtil.Await(() _publishEndpoint.Publish(new CreateUserTransactionRequest() { Amount = item }, c => c.SetAwaitAck(false)));
}
}
同步上下文中的 MassTransit 是否存在任何性能问题,或者我是否应该在我的代码中进行任何调整?
如果你生产一批消息,按顺序,你不应该每次都等待发布,会很慢。考虑将任务添加到列表中,并同时等待它们:
public void PostUserQuantitySync(int userId, decimal amount)
{
List<Task> tasks = new();
foreach (var item in Enumerable.Range(0, 1000))
{
tasks.Add(_publishEndpoint.Publish(new CreateUserTransactionRequest() { Amount = item }));
}
await Task.WhenAll(tasks);
}
如果您使用的是 RabbitMQ,您可以通过 ConfigureBatchPublish
方法如图:
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", h =>
{
h.ConfigureBatchSettings(b => b.Enabled = false);
});
// ...
});
我有一种情况,我应该通过 RabbitMQ 同步发布消息(遗留代码),否则它们会乱序,因为 MassTransit 在不同的线程中发布
public void PostUserQuantitySync(int userId, decimal amount)
{
foreach (var item in Enumerable.Range(0, 1000))
{
var _ = _publishEndpoint.Publish(new CreateUserTransactionRequest() { Amount = item });
}
return Ok();
}
所以我使用了 TaskUtil.Await and/or Wait() 但是发布性能很差(每秒 33/s 消息)而纯兔子客户端有更好的结果(200/s 消息至少每秒)并尊重消息排序:
public void PostUserQuantitySync(int userId, decimal amount)
{
foreach (var item in Enumerable.Range(0, 1000))
{
TaskUtil.Await(() _publishEndpoint.Publish(new CreateUserTransactionRequest() { Amount = item }, c => c.SetAwaitAck(false)));
}
}
同步上下文中的 MassTransit 是否存在任何性能问题,或者我是否应该在我的代码中进行任何调整?
如果你生产一批消息,按顺序,你不应该每次都等待发布,会很慢。考虑将任务添加到列表中,并同时等待它们:
public void PostUserQuantitySync(int userId, decimal amount)
{
List<Task> tasks = new();
foreach (var item in Enumerable.Range(0, 1000))
{
tasks.Add(_publishEndpoint.Publish(new CreateUserTransactionRequest() { Amount = item }));
}
await Task.WhenAll(tasks);
}
如果您使用的是 RabbitMQ,您可以通过 ConfigureBatchPublish
方法如图:
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", h =>
{
h.ConfigureBatchSettings(b => b.Enabled = false);
});
// ...
});