当 KTable 中缺少键时处理 KStream 与 KTable 的连接

Handle join of KStream with KTable when key is missing from KTable

我最近开始尝试使用 kafka 流。我有一个场景,我需要加入一个 KStream 和一个 KTableKTable 可能不包含某些键。在那种情况下,我得到一个 NullPointerException.

特别是我得到

stream-thread [StreamThread-1] Streams application error during processing: java.lang.NullPointerException

我不知道该如何处理。我无法以某种方式过滤掉与 table 条目不对应的流记录。

更新

再往下看,发现可以通过ReadOnlyKeyValueStore接口查询底层store是否存在key。

在这种情况下,我的问题是,这是最好的方法吗?即根据本地商店中是否存在密钥来过滤要加入的流?

我在这种情况下的第二个问题是,因为我关心在下一阶段利用版本 10.2 中引入的 Global State Store,我是否应该期望我也能够在同一阶段Global State Store?

的查询方式

更新

之前的更新不准确,因为无法从拓扑内部查询状态存储

最后更新

在更好地理解连接语义后,我能够解决问题只是将 valueJoiner 简化为仅 return 结果,而不是对连接值执行操作,并添加连接后的额外过滤步骤过滤掉空值。

我的问题的解决方案来自更好地理解 join 语义。

就像在数据库连接中一样(虽然我不是说 Kstream 连接精确地遵循数据库连接概念)左连接操作导致在缺少右侧键的情况下具有空值的行。

所以最终我唯一要做的就是将我的 valueJoiner 与后续计算/操作分离(我需要对连接记录的字段和 return 一个新的构造的对象)并且只有 return 一个连接值的数组。然后我可以通过检查这些数组来过滤掉导致 null 值的记录。

根据 Matthias 的 J. Sax 的建议,我使用了 0.10.2 版本而不是与代理版本 0.10.1 兼容的 0.10.1 并替换了整个 leftJoin具有内部连接的逻辑,无需过滤掉 null 值。