Kafka Streams 中 KStream-KStream join 的中间结果去重

Deduplicate intermediate results of KStream-KStream joins in Kafka Streams

我有以下场景:

  1. Table A 和 Table B 使用 FK 加入。
  2. A 和 B 的交易 insert/update。
  3. Debezium 为 Table A 发出一个事件 a,为 table B 发出一个事件 b
  4. Kafka Streams 为 Table A 和 B 创建 KStream。
  5. Kafka Streams 应用程序 leftJoin KStreams A 和 B。(假设 ab 记录具有相同的键并落入联接 window)。
  6. 输出记录将为 [a, null], [a, b]

如何丢弃 [a, null]

一个选项是执行 innerJoin,但在 update 查询的情况下这仍然是一个问题。

我们尝试使用事件时间戳进行过滤(即保持事件具有最新时间戳),但不能保证时间戳的唯一性。

即。最终目标是能够识别最新的聚合,以便我们可以在查询时过滤掉中间结果(在 Athena/Presto 或某些 RDBMS 中)。

目前,我发现的最佳工作方法是利用输出记录中的 Kafka 偏移量。

方法可以概括为:

  1. 执行您想要执行的所有逻辑,不用担心同一个键的多个记录。
  2. 将结果写入中间主题,保留时间非常短(1 小时等)
  3. 使用处理器读取中间主题,并在处理器内使用 context.offset() 使用 Kafka 偏移量丰富消息。
  4. 将消息写到输出主题。

现在,您的输出主题包含同一个键​​的多条消息,但每条消息都有不同的偏移量。

在查询期间,现在您可以 select 使用子查询每个键的最大偏移量。

下面是一个 TransformerSupplier 示例

/**
 * @param <K> key type
 * @param <V> value type
 */
public class OutputTransformSupplier<K, V> implements TransformerSupplier<K, V, KeyValue<String, String>> {
  @Override
  public Transformer<K, V, KeyValue<String, String>> get() {
    return new OutputTransformer<>();
  }

  private class OutputTransformer<K, V> implements Transformer<K, V, KeyValue<String, String>> {
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
      this.context = context;
    }

    /**
     * @param key   the key for the record
     * @param value the value for the record
     */
    @Override
    public KeyValue<String, String> transform(K key, V value) {
      if (value != null) {
        value.setKafkaOffset(context.offset());
      }
      return new KeyValue<>(key, value);
    }

    @Override
    public KeyValue<String, String> punctuate(long timestamp) {
      return null;
    }

    @Override
    public void close() {
      // nothing to close
    }
  }
}