对 Kafka 主题流执行查找的理想方式
Ideal way to perform lookup on a stream of Kafka topic
我有以下用例:
有一个关于 Kafka 主题的记录流。我有另一组唯一 ID。对于流中的每条记录,我需要检查流的 ID 是否存在于我拥有的唯一 ID 集中。基本上,这应该用作我的 Kafka Streams 应用程序的过滤器。即,仅将与我拥有的唯一 ID 集匹配的 Kafka 主题记录写入另一个主题。
我们目前的应用是基于 Kafka Streams 的。我查看了 KStreams 和 KTables。看起来它们很适合丰富。现在,我不需要对数据进行任何丰富。至于使用状态存储,我不确定它们作为可扩展解决方案有多好。
我想做这样的事情:
kStream.filter((k, v) -> {
valueToCheckInKTable = v.get(FIELD_NAME);
if (kTable.containsKey(valueToCheckInKTable)) return record
else ignore
});
查找数据可能非常庞大。有人可以建议最好的方法吗?
您可以通过 builder.table("id-topic")
将引用 ID 读入 table 并将 ID 作为主键(注意该值必须是非空的——否则它会被解释为删除-- 如果没有实际值,则在将 ID 写入 id-topic
时,只需将每条记录的任何非空虚拟值放入即可。要在启动时加载完整的 table,您可能希望通过 table()
运算符上的 Consumed
参数提供始终 returns 0
的自定义时间戳提取器(记录按时间戳顺序处理,returning 0
确保首先处理来自 id-topic
的记录以加载 table)。
要进行过滤,您需要执行一个流-table join:
KStream stream = builder.stream(...);
// the key in the stream must be ID, if this is not the case, you can use `selectKey()` to set a new ke
KStream filteredStream = stream.join(table,...);
由于您不想做任何丰富,提供的 Joiner
函数可以 return 不修改左侧流侧值(并且可以忽略右侧 table值)。
我有以下用例: 有一个关于 Kafka 主题的记录流。我有另一组唯一 ID。对于流中的每条记录,我需要检查流的 ID 是否存在于我拥有的唯一 ID 集中。基本上,这应该用作我的 Kafka Streams 应用程序的过滤器。即,仅将与我拥有的唯一 ID 集匹配的 Kafka 主题记录写入另一个主题。
我们目前的应用是基于 Kafka Streams 的。我查看了 KStreams 和 KTables。看起来它们很适合丰富。现在,我不需要对数据进行任何丰富。至于使用状态存储,我不确定它们作为可扩展解决方案有多好。
我想做这样的事情:
kStream.filter((k, v) -> {
valueToCheckInKTable = v.get(FIELD_NAME);
if (kTable.containsKey(valueToCheckInKTable)) return record
else ignore
});
查找数据可能非常庞大。有人可以建议最好的方法吗?
您可以通过 builder.table("id-topic")
将引用 ID 读入 table 并将 ID 作为主键(注意该值必须是非空的——否则它会被解释为删除-- 如果没有实际值,则在将 ID 写入 id-topic
时,只需将每条记录的任何非空虚拟值放入即可。要在启动时加载完整的 table,您可能希望通过 table()
运算符上的 Consumed
参数提供始终 returns 0
的自定义时间戳提取器(记录按时间戳顺序处理,returning 0
确保首先处理来自 id-topic
的记录以加载 table)。
要进行过滤,您需要执行一个流-table join:
KStream stream = builder.stream(...);
// the key in the stream must be ID, if this is not the case, you can use `selectKey()` to set a new ke
KStream filteredStream = stream.join(table,...);
由于您不想做任何丰富,提供的 Joiner
函数可以 return 不修改左侧流侧值(并且可以忽略右侧 table值)。