当 KTable 中缺少键时处理 KStream 与 KTable 的连接
Handle join of KStream with KTable when key is missing from KTable
我最近开始尝试使用 kafka 流。我有一个场景,我需要加入一个 KStream
和一个 KTable
。 KTable
可能不包含某些键。在那种情况下,我得到一个 NullPointerException
.
特别是我得到
stream-thread [StreamThread-1] Streams application error during processing:
java.lang.NullPointerException
我不知道该如何处理。我无法以某种方式过滤掉与 table 条目不对应的流记录。
更新
再往下看,发现可以通过ReadOnlyKeyValueStore
接口查询底层store是否存在key。
在这种情况下,我的问题是,这是最好的方法吗?即根据本地商店中是否存在密钥来过滤要加入的流?
我在这种情况下的第二个问题是,因为我关心在下一阶段利用版本 10.2
中引入的 Global State Store
,我是否应该期望我也能够在同一阶段Global State Store
?
的查询方式
更新
之前的更新不准确,因为无法从拓扑内部查询状态存储
最后更新
在更好地理解连接语义后,我能够解决问题只是将 valueJoiner
简化为仅 return 结果,而不是对连接值执行操作,并添加连接后的额外过滤步骤过滤掉空值。
我的问题的解决方案来自更好地理解 join
语义。
就像在数据库连接中一样(虽然我不是说 Kstream
连接精确地遵循数据库连接概念)左连接操作导致在缺少右侧键的情况下具有空值的行。
所以最终我唯一要做的就是将我的 valueJoiner
与后续计算/操作分离(我需要对连接记录的字段和 return 一个新的构造的对象)并且只有 return 一个连接值的数组。然后我可以通过检查这些数组来过滤掉导致 null
值的记录。
根据 Matthias 的 J. Sax 的建议,我使用了 0.10.2
版本而不是与代理版本 0.10.1
兼容的 0.10.1
并替换了整个 leftJoin
具有内部连接的逻辑,无需过滤掉 null
值。
我最近开始尝试使用 kafka 流。我有一个场景,我需要加入一个 KStream
和一个 KTable
。 KTable
可能不包含某些键。在那种情况下,我得到一个 NullPointerException
.
特别是我得到
stream-thread [StreamThread-1] Streams application error during processing: java.lang.NullPointerException
我不知道该如何处理。我无法以某种方式过滤掉与 table 条目不对应的流记录。
更新
再往下看,发现可以通过ReadOnlyKeyValueStore
接口查询底层store是否存在key。
在这种情况下,我的问题是,这是最好的方法吗?即根据本地商店中是否存在密钥来过滤要加入的流?
我在这种情况下的第二个问题是,因为我关心在下一阶段利用版本 10.2
中引入的 Global State Store
,我是否应该期望我也能够在同一阶段Global State Store
?
更新
之前的更新不准确,因为无法从拓扑内部查询状态存储
最后更新
在更好地理解连接语义后,我能够解决问题只是将 valueJoiner
简化为仅 return 结果,而不是对连接值执行操作,并添加连接后的额外过滤步骤过滤掉空值。
我的问题的解决方案来自更好地理解 join
语义。
就像在数据库连接中一样(虽然我不是说 Kstream
连接精确地遵循数据库连接概念)左连接操作导致在缺少右侧键的情况下具有空值的行。
所以最终我唯一要做的就是将我的 valueJoiner
与后续计算/操作分离(我需要对连接记录的字段和 return 一个新的构造的对象)并且只有 return 一个连接值的数组。然后我可以通过检查这些数组来过滤掉导致 null
值的记录。
根据 Matthias 的 J. Sax 的建议,我使用了 0.10.2
版本而不是与代理版本 0.10.1
兼容的 0.10.1
并替换了整个 leftJoin
具有内部连接的逻辑,无需过滤掉 null
值。