Kafka Stream 配合 JoinWindow 进行数据重放

Kafka Stream work with JoinWindow for data replay

我有 2 个数据流,我希望能够加入它们 window 1 个月。当我有实时数据时,使用 KStreamjoin,一切都变得有趣且超级简单。我做了这样的事情;

KStream<String, GenericRecord> stream1 =
            builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic1());

KStream<String, GenericRecord> stream2 =
            builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic2());

long joinWindowSizeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days

    KStream<String, GenericRecord> joinStream = stream1.join(stream2,
            new ValueJoiner<GenericRecord, GenericRecord, GenericRecord>() {
                @Override
                public GenericRecord apply(GenericRecord genericRecord, GenericRecord genericRecord2) {
                    final GenericRecord jonnedRecord = new GenericData.Record(jonnedRecordSchema);
                    ....
                    ....
                    ....

                    return jonnedRecord;
                }
            }, JoinWindows.of(joinWindowSizeMs));

当我想做数据回放时出现问题。假设我想为过去 6 个月的数据重新进行这些连接,因为我 运行 一次为所有数据设置管道 kafkaStream 将连接所有可连接的数据并且不需要时间考虑差异(它应该只加入过去一个月的数据)。我假设 JoinWindow 时间是我们将数据插入 Kafka 主题的时间,对吗?
这次我该如何更改和操作,以便我可以 运行 我的数据正确重放,我的意思是重新插入过去 6 个月的数据,每个记录需要 window 一个月并加入基于那个的。

这个问题与 不重复,在那里我询问了如何根据时间 window 加入。这里我说的是数据重放。根据我在加入 Kafka 期间的理解,将数据插入主题的时间作为 JoinWindow 的时间,因此如果您想重放数据并重新插入 6 个月前的数据,kafka 将其作为新数据今天插入并将它与一些实际上今天不应该的其他数据一起加入。

Kafka 的 Streams API 使用 TimestampExtractor 返回的时间戳来计算连接。默认情况下,这是记录的嵌入元数据时间戳。 (c.f.http://docs.confluent.io/current/streams/concepts.html#time)

默认情况下,KafkaProducer 将此时间戳设置为写入时的当前系统时间。 (作为替代方案,您可以在 per-topic 基础上配置代理,以在代理存储记录时使用代理的系统时间覆盖记录的 producer-provided 时间戳——这提供了 "ingestion time" 语义.)

因此,这本身不是 Kafka Streams 问题。

有多个选项可以解决这个问题:

  1. 如果您的数据已经在一个主题中,您可以简单地重置您的 Streams 应用程序以重新处理旧数据。为此,您可以使用应用程序重置工具 (bin/kafka-streams-application-reset.sh)。您还需要在您的 Streams 应用中将 auto.offset.reset 策略指定为 earliest。查看文档——另外,建议阅读博客 post.

这是最好的方法,因为您不需要再次向主题写入数据。

  1. 如果您的数据不在主题中并且您需要写入数据,您可以通过为每条记录提供时间戳来在应用程序级别明确设置记录时间戳:
KafkaProducer producer = new KafkaProducer(...);
producer.send(new ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value));

因此,如果您摄取旧数据,您可以显式设置时间戳,Kafka Streams 将拾取它并相应地计算连接。