使用 Kafka 进行更新和删除的数据集成

Using Kafka for Data Integration with Updates & Deletes

那么一点背景知识 - 我们有大量的数据源,从 RDBMS 到 S3 文件。我们希望将这些数据与其他各种数据仓库、数据库等同步集成

起初,这似乎是 Kafka 的规范模型。我们希望通过 Kafka 将数据更改流式传输到数据输出源。在我们的测试案例中,我们使用 Oracle Golden Gate 捕获更改并成功将更改推送到 Kafka 队列。然而,事实证明,将这些更改推送到数据输出源具有挑战性。

我意识到,如果我们只是向 Kafka 主题和队列添加新数据,这会非常有效。我们可以缓存更改并将更改写入各种数据输出源。然而,这种情况并非如此。我们会更新,删除,修改分区等等。处理这个的逻辑似乎要复杂得多。

我们尝试使用暂存表并连接到 update/delete 数据,但我觉得这很快就会变得非常笨拙。

这就是我的问题 - 我们是否可以采用任何不同的方法来处理这些操作?还是我们应该完全朝不同的方向前进?

任何 suggestions/help 非常感谢。谢谢!

您可以采用 3 种方法:

  1. 完全卸载负载
  2. 增量转储负载
  3. 二进制日志复制

完全卸载负载

定期将 RDBMS 数据源 table 转储到一个文件中,然后将其加载到数据仓库中,替换以前的版本。这种方法主要用于小型 tables,但实现起来非常简单,并且支持轻松更新和删除数据。

增量转储负载

定期获取自上次查询以来更改的记录,并将它们发送到数据仓库以供加载。类似于

SELECT *
FROM my_table
WHERE last_update > #{last_import}

这种方法实施起来稍微复杂一些,因为您必须维护状态(上面代码段中的"last_import"),并且它不支持删除。可以扩展它以支持删除,但这会使它变得更加复杂。这种方法的另一个缺点是它要求您的 table 有一个 last_update 列。

二进制日志复制

编写一个程序,持续监听 RDBMS 的二进制日志并将这些更新发送到数据仓库中的中间 table,包含行的更新值,以及它是否是一个删除操作或update/create。然后编写一个定期合并这些更新的查询,以创建一个反映原始 table 的 table。此合并过程背后的想法是 select,对于每个 id,所有更新中看到的最后(最先进的)版本,或合并的先前版本 table.

这种方法实施起来稍微复杂一些,但即使在大型 table 上也能实现高性能,并支持更新和删除。

Kafka 与此方法相关,因为它可以用作 binlog 侦听器和加载到数据仓库中间层之间的行更新的管道 table.


您可以阅读有关这些内容的更多信息different replication approaches in this blog post

披露:我在 Alooma 工作(一位同事写了上面链接的博客 post,我们提供数据管道作为服务,解决了这样的问题)。