Mulesoft 与 Salesforce Streaming API 使用 CDC

Mulesoft with Salesforce Streaming API using CDC

我正在使用 Mule API 流程测试 Salesforce 事件流。我设置了连接器并订阅了流媒体频道。

当我创建/更新/删除联系人记录时,这工作得很好,事件通过,我通过将它们添加到另一个数据库来处理它们。

我对 replayId 功能有点困惑。使用当前设置,我可以关闭 Mule 应用程序,在组织中创建联系人,然后当我将应用程序重新联机时,它会通过从中断处添加数据来恢复。完美。

但是,我正在尝试模拟如果 mule 应用程序在处理事件时崩溃会发生什么。

我 运行 一些 APEX 创建 100 运行dom 联系人记录。当我看到它在我的应用程序中记录第一个流时,我就终止了 mule 应用程序。我在这里的假设是,当我恢复应用程序时,它会知道它在哪里停止,就好像它在之前的测试中创建联系人之前处于离线状态一样。

我注意到它只处理在我关闭应用程序之前完成的少数联系人。

似乎流输入中的事件可能来得太快,以至于它已经到达流中的最后一个 replayId。但是,由于这些记录仍未添加到我的外部数据库中,因此我丢失了这些记录。流做了它应该做的,但由于应用程序仍在处理批处理工作,我的 100 条记录没有像 replayId 反映的那样提交。

如果在应用程序崩溃之前存在大量数据流,我该如何处理才能避免丢失数据?我记得在使用 Kafka 时,你必须能够 commit 将 id 插入数据库后,它才能知道你正式处理的最后一个。 Mule 中是否有这样一个概念,我可以告诉它我在何处正式停止并提交给 DB?

协议 (CometD) 级别的可靠性意味着许多属性。其中最主要的是订阅者已收到消息的事务性 ACK(知识)。 CometD 支持 ACK 作为扩展。 Salesforce 的 CometD 实施不支持 ACK。即使是这样,您仍然有 issues...但是 frequency/loss 的风险可能会更低。

在您的情况下,您必须设计一个解决方案,该解决方案相当于查找和重播未提交给目标数据库的事件。您可以使用 Mule 中的自定义代码或接线适配器来执行此操作。不保证重播 ID 值对于连续事件是连续的,但它们将被排序。重放 ID 为 100 的事件 A 后跟重放 ID 为 200 的事件 B。

您需要在数据库中存储重播 ID 值。然后,您可以在重新订阅时使用它(在订阅者失败后)从 SF 检索数据库中丢失的事件。这仅在故障 window 足够小的情况下才有效。对于标准平台事件许可证,Salesforce 事件保留 window 目前为 24 小时。 Higher-level 许可证允许保留更长时间。

根据数据量、事件频率和其他过程参数,您可以使用 Heroku Connect 立即获得所有这些。它确实意味着 Heroku 上的 Postgres 数据库 + HC 的许可成本和运营成本,但我们大多数处于类似情况的客户都认为这是值得的。