具有在 HDFS 上查找数据的 Kafka Streams

Kafka Streams with lookup data on HDFS

我正在使用 Kafka Streams (v0.10.0.1) 编写应用程序,并希望使用查找数据丰富我正在处理的记录。此数据(带时间戳的文件)每天(或每天 2-3 次)写入 HDFS 目录。

如何在 Kafka Streams 应用程序中加载它并加入实际的 KStream
当新文件到达那里时从 HDFS 重新读取数据的最佳做法是什么?

或者切换到 Kafka Connect 并将 RDBMS table 内容写入 Kafka 主题可以被所有 Kafka Streams 应用程序实例使用会更好吗?

更新:
正如建议的那样,Kafka Connect 将是必经之路。因为查找数据在 RDBMS 中以 每天 为基础进行更新,所以我正在考虑 运行 Kafka Connect 作为计划 one-off job 而不是保持连接始终打开。是的,因为语义和保持连接始终打开并确保它不会被中断等的开销。对我来说,在这种情况下进行预定的提取看起来更安全。

查找数据不大,记录可能删除/添加/修改。我也不知道如何始终将完整转储到 Kafka 主题并截断以前的记录。启用日志压缩并为已删除的键发送空值可能不会起作用,因为我不知道源系统中删除了什么。另外 AFAIK 当压缩发生时我没有控制权。

推荐的方法确实是将查找数据也提取到 Kafka 中——例如通过 Kafka Connect——正如你在上面自己建议的那样。

But in this case how can I schedule the Connect job to run on a daily basis rather than continuously fetch from the source table which is not necessary in my case?

也许您可以更新您的问题,您不想让 Kafka Connect 作业持续进行 运行?您是否担心资源消耗(数据库上的负载),如果不是 "daily udpates" 或...,您是否担心处理的语义?

Update: As suggested Kafka Connect would be the way to go. Because the lookup data is updated in the RDBMS on a daily basis I was thinking about running Kafka Connect as a scheduled one-off job instead of keeping the connection always open. Yes, because of semantics and the overhead of keeping a connection always open and making sure that it won't be interrupted..etc. For me having a scheduled fetch in this case looks safer.

Kafka Connect 安全,并且 JDBC 连接器的构建正是为了将数据库表以健壮、容错且高效的方式(已经有很多生产部署)。所以我建议不要仅仅因为 "it looks safer" 就回退到 "batch update" 模式;就我个人而言,我认为触发每日摄取在操作上不如保持它 运行 以进行连续(和实时!)摄取方便,而且它还会为您的实际用例带来一些不利影响(请参阅下一段)。

当然,您的进度可能会有所不同——因此,如果您打算每天只更新一次,那就去做吧。但是你失去了 a) 在充实发生的时间点用最新的数据库数据充实你的传入记录的能力,并且,相反地,b) 你实际上可能用 stale/old 数据充实传入记录直到下一次每日更新完成,这很可能会导致您向下游发送/提供给其他应用程序使用的数据不正确。例如,如果客户更新了她的送货地址(在数据库中),但您每天只将此信息提供给您的流处理应用程序(以及可能的许多其他应用程序)一次,那么订单处理应用程序会将包裹运送到错误的地方地址直到下一次每日摄取完成。

The lookup data is not big and records may be deleted / added / modified. I don't know either how I can always have a full dump into a Kafka topic and truncate the previous records. Enabling log compaction and sending null values for the keys that have been deleted would probably won't work as I don't know what has been deleted in the source system.

Kafka Connect 的 JDBC 连接器已经为您自动处理:1. 它确保 DB inserts/updates/deletes 正确反映在 Kafka 主题中,以及 2. Kafka 的日志压缩确保目标话题不会越界。也许您可能想阅读文档中的 JDBC 连接器以了解您刚刚免费获得的功能:http://docs.confluent.io/current/connect/connect-jdbc/docs/ ?