Kafka Streams 中 KStream-KStream join 的中间结果去重
Deduplicate intermediate results of KStream-KStream joins in Kafka Streams
我有以下场景:
- Table A 和 Table B 使用 FK 加入。
- A 和 B 的交易 insert/update。
- Debezium 为 Table A 发出一个事件
a
,为 table B 发出一个事件 b
。
- Kafka Streams 为 Table A 和 B 创建 KStream。
- Kafka Streams 应用程序
leftJoin
KStreams A 和 B。(假设 a
和 b
记录具有相同的键并落入联接 window)。
- 输出记录将为
[a, null], [a, b]
。
如何丢弃 [a, null]
?
一个选项是执行 innerJoin
,但在 update
查询的情况下这仍然是一个问题。
我们尝试使用事件时间戳进行过滤(即保持事件具有最新时间戳),但不能保证时间戳的唯一性。
即。最终目标是能够识别最新的聚合,以便我们可以在查询时过滤掉中间结果(在 Athena/Presto 或某些 RDBMS 中)。
目前,我发现的最佳工作方法是利用输出记录中的 Kafka 偏移量。
方法可以概括为:
- 执行您想要执行的所有逻辑,不用担心同一个键的多个记录。
- 将结果写入中间主题,保留时间非常短(1 小时等)
- 使用处理器读取中间主题,并在处理器内使用
context.offset()
使用 Kafka 偏移量丰富消息。
- 将消息写到输出主题。
现在,您的输出主题包含同一个键的多条消息,但每条消息都有不同的偏移量。
在查询期间,现在您可以 select 使用子查询每个键的最大偏移量。
下面是一个 TransformerSupplier 示例
/**
* @param <K> key type
* @param <V> value type
*/
public class OutputTransformSupplier<K, V> implements TransformerSupplier<K, V, KeyValue<String, String>> {
@Override
public Transformer<K, V, KeyValue<String, String>> get() {
return new OutputTransformer<>();
}
private class OutputTransformer<K, V> implements Transformer<K, V, KeyValue<String, String>> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
/**
* @param key the key for the record
* @param value the value for the record
*/
@Override
public KeyValue<String, String> transform(K key, V value) {
if (value != null) {
value.setKafkaOffset(context.offset());
}
return new KeyValue<>(key, value);
}
@Override
public KeyValue<String, String> punctuate(long timestamp) {
return null;
}
@Override
public void close() {
// nothing to close
}
}
}
我有以下场景:
- Table A 和 Table B 使用 FK 加入。
- A 和 B 的交易 insert/update。
- Debezium 为 Table A 发出一个事件
a
,为 table B 发出一个事件b
。 - Kafka Streams 为 Table A 和 B 创建 KStream。
- Kafka Streams 应用程序
leftJoin
KStreams A 和 B。(假设a
和b
记录具有相同的键并落入联接 window)。 - 输出记录将为
[a, null], [a, b]
。
如何丢弃 [a, null]
?
一个选项是执行 innerJoin
,但在 update
查询的情况下这仍然是一个问题。
我们尝试使用事件时间戳进行过滤(即保持事件具有最新时间戳),但不能保证时间戳的唯一性。
即。最终目标是能够识别最新的聚合,以便我们可以在查询时过滤掉中间结果(在 Athena/Presto 或某些 RDBMS 中)。
目前,我发现的最佳工作方法是利用输出记录中的 Kafka 偏移量。
方法可以概括为:
- 执行您想要执行的所有逻辑,不用担心同一个键的多个记录。
- 将结果写入中间主题,保留时间非常短(1 小时等)
- 使用处理器读取中间主题,并在处理器内使用
context.offset()
使用 Kafka 偏移量丰富消息。 - 将消息写到输出主题。
现在,您的输出主题包含同一个键的多条消息,但每条消息都有不同的偏移量。
在查询期间,现在您可以 select 使用子查询每个键的最大偏移量。
下面是一个 TransformerSupplier 示例
/**
* @param <K> key type
* @param <V> value type
*/
public class OutputTransformSupplier<K, V> implements TransformerSupplier<K, V, KeyValue<String, String>> {
@Override
public Transformer<K, V, KeyValue<String, String>> get() {
return new OutputTransformer<>();
}
private class OutputTransformer<K, V> implements Transformer<K, V, KeyValue<String, String>> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
/**
* @param key the key for the record
* @param value the value for the record
*/
@Override
public KeyValue<String, String> transform(K key, V value) {
if (value != null) {
value.setKafkaOffset(context.offset());
}
return new KeyValue<>(key, value);
}
@Override
public KeyValue<String, String> punctuate(long timestamp) {
return null;
}
@Override
public void close() {
// nothing to close
}
}
}