Kafka Streams 窗口是如何工作的?

How does Kafka Streams windowing work?

我很难理解 Windowing 在 Kafka Streams 中是如何工作的。到目前为止,结果似乎与我所阅读和理解的内容不一致。

我创建了一个带有支持主题的 KSQL 流。 KSQL SELECT 语句中的 'columns' 之一已被指定为主题的时间戳。

CREATE STREAM my_stream WITH (KAFKA_topic='my-stream-topic', VALUE_FORMAT='json', TIMESTAMP='recorded_timestamp') AS select <select list> PARTITION BY PARTITION_KEY;

my-stream-topic 中的记录按键 (PARTITION_KEY) 分组,并 window 跳频 window

val dataWindowed: TimeWindowedKStream[String, TopicValue] = builder.stream('my-stream-topic', consumed) 
    .groupByKey(Serialized.`with`(Serdes.String(), valueSerde))
    .windowedBy(TimeWindows.`of`(TimeUnit.MINUTES.toMillis(5)).advanceBy(TimeUnit.MINUTES.toMillis(1)).until(TimeUnit.MINUTES.toMillis(5)))

记录通过

聚合
val dataAgg: KTable[Windowed[String], ValueStats] = dataWindowed
    .aggregate(
      new Initializer[TopicStats] {<code omitted>}},
      new Aggregator[String, TopicValue, TopicStats] {<code omitted>}},
      Materialized.`as`[String, TopicStats, WindowStore[Bytes, Array[Byte]]]("time-windowed-aggregated-stream-store")
        .withValueSerde(new JSONSerde[TopicStats])
    )

  val topicStats: KStream[String, TopicValueStats] = dataAgg
    .toStream()
    .map( <code omitted for brevity>)

然后我通过

打印到控制台
dataAgg.print()
topicStats.print()

组中的第一个 window 转换为 7:00 - 7:05

当我通过控制台消费者检查 my-stream-topic 中的记录时,我看到有 2 条记录应该属于上述 window。但是,聚合器只收集了其中的 1 个。

我认为 dataAgg windowed KTable 将包含分组键的 1 条记录,但聚合将使用这 2 条记录来计算聚合。打印的合计值不正确。

我错过了什么?

KSQL 可以在写入时设置记录时间戳,但是您需要在创建输入流时指定时间戳,而不是在定义输出流时指定时间戳。即,为输入流指定的时间戳将用于在写入时设置记录元数据字段。

这种行为相当不直观,我为此问题开了一个工单:https://github.com/confluentinc/ksql/issues/1367

因此,在为问题中显示的查询创建输入流时,您需要指定 with(TIMESTAMP='recorded_timestamp') 子句。如果这不可能,因为您的查询需要对不同的时间戳进行操作,您需要指定第二个查询将数据复制到新主题中。

CREATE STREAM my_stream_with_ts
    WITH (KAFKA_topic='my-stream-topic-with-ts')
AS select * from my_stream PARTITION BY PARTITION_KEY;

作为替代方案,您可以为 Kafka Streams 应用程序设置自定义时间戳提取器,以从有效负载中提取时间戳。