Kafka 流聚合器中的访问记录偏移量

Access Record Offset in Kafka Streams Aggregator

我有一个简单的 windowing 拓扑:

builder.stream("input-topic", Consumed.with(...))
    .groupByKey()
    .windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
    .aggregate(Frame::new,
            this::windowAggregator,
            ...
    )
    .transformValues(FrameTransformer::new)
    .toStream()
    .selectKey((key, value) -> value...)
    .to("output-topic", Produced.with(...));

我想将 window 开头的实际记录偏移量放入帧聚合对象中。

如何从 windowAggregatoraggregate() 处理程序)函数访问记录偏移量?

我知道我可以访问 FrameTransformer 中的记录偏移量,但这无助于我创建准确的 Frame 对象来描述我的 windows 开始和结束偏移量。

我听说有一种方法可以通过在 groupByKey() 之前插入另一个 .transform() 调用来实现,在那里我可以访问偏移量,但是我需要修改架构我的事件记录将偏移量信息存储在里面。

有没有(更简单的)方法来实现我的意图?

更新

事实上,我能够通过以下方式在 Frame 对象中获得准确的 window 开始和结束偏移量

builder.stream("input-topic", Consumed.with(...))
    .transformValues(EventTransformer::new)
    .groupByKey()
    .windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
    .aggregate(Frame::new,
            this::windowAggregator,
            ...
    )
    .toStream()
    .selectKey((key, value) -> value...)
    .to("output-topic", Produced.with(...));

但是如上所述,以编辑 Event 对象的架构为代价。

How can I get access to the record offset from the windowAggregator (aggregate() handler) function?

你不能。您在聚合之前使用 transformValues() 的方法(并丰富 Event 对象是正确的方法。

有人提议扩展 API 以允许在 aggregate() 和其他 DSL 运营商中访问记录元数据,但它从未被推过终点线(参见 https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams) .