NServiceBus 批量处理消息

NServiceBus Handle Messages as a Batch

我正在寻找后端消息处理中出现的常见模式:

ServiceA 生成大量消息。

ServiceB 一次处理一条消息。

ServiceC 发出对数据库的调用或 Web 服务调用,通过批量调用获得显着的性能和可靠性收益。

在某些情况下,预先批处理来自 ServiceA 的消息或在 ServiceB 中成批处理消息是不可行的,因此首选单独处理所有消息,直到最终调用 ServiceC。这需要在调用 ServiceC 之前执行批处理步骤。

看起来理想的是拥有一个 NServiceBus 处理程序签名,可以选择成批传递消息,例如:

public void Handle(FooMessage[] messageBatch)
{
}

其中 none messageBatch 中的消息被提交,直到处理程序执行。

这似乎在 NServiceBus 中不受原生支持。我可以一次处理队列外的消息并写入内存,直到批量刷新。但在这种情况下,消息是在刷新之前提交的,如果进程崩溃,我们不会保留批处理中所有消息的传递保证。

所以问题是:由于某种我没有想到的原因,这是一个糟糕的模式吗?我知道知道何时刷新批处理存在一个固有问题,但似乎至少有一些传输实现在批处理中缓冲消息已经在幕后,并且只是一次传递一个。在此级别进行批处理或为定期刷新设置一个简单的超时似乎可行。

是否有我缺少的解决方法或首选模式?

Upfront/Disclaimer:我在 NServiceBus 的制造商 Particular Software 工作。我也写了Learning NServiceBus.

历史

在我为 Particular 工作之前,我曾经发现自己和你的情况一模一样。我有一个分析类型的情况,其中 12 个 Web 服务器通过 MSMQ 发送相同类型的命令以指示一篇文章被查看。需要在数据库中跟踪这些计数,以便可以根据观看次数生成 "most popular" 列表。但是从每个页面视图插入的效果并不好,所以我引入了服务总线。

插入器可以使用 table-valued 参数一次插入多达 50-100 条消息,但 NServiceBus 在一个事务中一次只给你一条消息。

为什么不使用 Saga?

在 NServiceBus 中,任何对多条消息进行操作的操作通常都需要使用 Saga。 (Saga 基本上是一堆相关的消息处理程序,在处理每条消息之间保持一些存储状态。)

但是 Saga 必须将其数据存储在某个地方,这通常意味着数据库。那么让我们比较一下:

  • 现在使用 NServiceBus,50 条消息意味着 50 次数据库插入。
  • 对于假设的批量接收,50 条消息意味着 1 次数据库批量插入。
  • 对于 Sagas,50 条消息意味着 50 次 Saga 数据读取 + 50 次 Saga 数据更新,然后是单个数据库批量插入。

所以 Saga 使 "persistence load" 变得更糟。

当然,您可以选择对 Saga 使用 in-memory 持久化。这将为您提供批处理而无需额外的持久性开销,但如果 Saga 端点崩溃,您可能会丢失部分批处理。因此,如果您不愿意丢失数据,那不是一个选择。

批量接收是什么样子的?

所以甚至几年前,我就想象过这样的事情:

// Not a real NServiceBus thing! Only exists in my imagination!
public interface IHandleMessageBatches<TMessage>
{
    void Handle(TMessage[] messages);
    int MaxBatchSize { get; }
}

我们的想法是,如果消息传输可以向前看并看到许多可用消息,它可以开始接收 MaxBatchSize 的消息,并且您可以一次接收所有消息。当然,如果 queue 中只有 1 条消息,您会得到一个包含 1 条消息的数组。

问题

几年前我坐下来研究 NServiceBus 代码库,认为我会尝试实现它。好吧,我失败了。当时,尽管 MSMQ 是唯一的传输(在 NServiceBus V3 中),API 的架构使得传输代码可以查看 queue 并一次拉出一条消息,引发 in-memory 消息处理逻辑启动的事件。如果不进行大量重大更改,就不可能更改它。

较新版本中的代码更加模块化,这在很大程度上是因为现在支持多消息传输。但是,仍然假设一次处理一条消息。

进入 V6 的当前实现是在 IPushMessages 接口中。在 Initialize 方法中,Core 将 Func<PushContext, Task> pipe 推送到传输的 IPushMessages.

实现中

或者英文,"Hey Transport, when you have a message available, execute this to hand it over to the Core and we'll take it from there."

简而言之,这是因为 NServiceBus 旨在一次可靠地处理一条消息。从更详细的角度来看,批处理接收困难的原因有很多:

  • 进行交易时,接收一批需要处理该交易中的所有消息。如果事务变得太大,这很容易失控。
  • queue 中可以混合消息类型。消息类型毕竟只是一个header。没办法说 "Give me a batch of messages of type T." 如果你收到一个批次并且它包含 other 消息类型怎么办?
  • 多个处理程序可以 运行 处理同一消息类型。例如,如果消息 SuperMessage 继承 BaseMessage,两种类型的处理程序可以 运行 处理同一消息。当考虑一批消息时,这种多处理程序和多态消息处理程序的可能性变得非常复杂。
  • 关于多态消息的更多信息,如果批处理是 Handle(BaseMessage[] batch) 但传入的消息是从 BaseMessage 继承的不同超类型怎么办?
  • 我敢肯定,还有很多我什至没有想到的事情。

总而言之,将 NServiceBus 更改为接受批处理需要针对批处理优化整个管道。单个消息(当前规范)将是数组大小为 1 的专门批处理。

所以从本质上讲,对于它所提供的有限的商业价值来说,这种改变的风险太大了。

建议

我发现,对每条消息执行一次插入并不像我想象的那么昂贵。糟糕的是,多个 Web 服务器上的多个线程尝试同时写入数据库并卡在该 RPC 操作中,直到完成。

当这些动作被序列化为queue,并且有限设置线程数处理这些消息并以数据库可以处理的速度进行数据库插入,大多数情况下,事情往往 运行 相当顺利。

此外,请仔细考虑您在数据库中所做的事情。对现有行的更新比插入便宜很多。就我而言,我真的只关心计数,不需要为每个单独的页面视图记录。因此,根据内容 ID 和 5 分钟时间 window 更新记录并更新该记录的读取计数比每次读取都插入一条记录并强迫自己进行大量聚合查询更便宜.

如果这绝对行不通,您需要考虑可以在可靠性方面做出哪些权衡。您可以使用具有 in-memory 持久性的 Saga,但是您可能(并且很可能最终)丢失整个批次。这可能是可以接受的,具体取决于您的用例。

您还可以使用消息处理程序写入 Redis,这比数据库更便宜,然后有一个更像调度程序的 Saga 将数据批量迁移到数据库。您可能可以使用 Kafka 或许多其他技术来做类似的事情。在这些情况下,由您来决定您需要什么样的可靠性保证并设置可以提供的工具。