Kafka 流聚合器中的访问记录偏移量
Access Record Offset in Kafka Streams Aggregator
我有一个简单的 windowing 拓扑:
builder.stream("input-topic", Consumed.with(...))
.groupByKey()
.windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
.aggregate(Frame::new,
this::windowAggregator,
...
)
.transformValues(FrameTransformer::new)
.toStream()
.selectKey((key, value) -> value...)
.to("output-topic", Produced.with(...));
我想将 window 开头的实际记录偏移量放入帧聚合对象中。
如何从 windowAggregator
(aggregate()
处理程序)函数访问记录偏移量?
我知道我可以访问 FrameTransformer
中的记录偏移量,但这无助于我创建准确的 Frame
对象来描述我的 windows 开始和结束偏移量。
我听说有一种方法可以通过在 groupByKey()
之前插入另一个 .transform()
调用来实现,在那里我可以访问偏移量,但是我需要修改架构我的事件记录将偏移量信息存储在里面。
有没有(更简单的)方法来实现我的意图?
更新
事实上,我能够通过以下方式在 Frame
对象中获得准确的 window 开始和结束偏移量
builder.stream("input-topic", Consumed.with(...))
.transformValues(EventTransformer::new)
.groupByKey()
.windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
.aggregate(Frame::new,
this::windowAggregator,
...
)
.toStream()
.selectKey((key, value) -> value...)
.to("output-topic", Produced.with(...));
但是如上所述,以编辑 Event
对象的架构为代价。
How can I get access to the record offset from the windowAggregator (aggregate() handler) function?
你不能。您在聚合之前使用 transformValues()
的方法(并丰富 Event
对象是正确的方法。
有人提议扩展 API 以允许在 aggregate()
和其他 DSL 运营商中访问记录元数据,但它从未被推过终点线(参见 https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams) .
我有一个简单的 windowing 拓扑:
builder.stream("input-topic", Consumed.with(...))
.groupByKey()
.windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
.aggregate(Frame::new,
this::windowAggregator,
...
)
.transformValues(FrameTransformer::new)
.toStream()
.selectKey((key, value) -> value...)
.to("output-topic", Produced.with(...));
我想将 window 开头的实际记录偏移量放入帧聚合对象中。
如何从 windowAggregator
(aggregate()
处理程序)函数访问记录偏移量?
我知道我可以访问 FrameTransformer
中的记录偏移量,但这无助于我创建准确的 Frame
对象来描述我的 windows 开始和结束偏移量。
我听说有一种方法可以通过在 groupByKey()
之前插入另一个 .transform()
调用来实现,在那里我可以访问偏移量,但是我需要修改架构我的事件记录将偏移量信息存储在里面。
有没有(更简单的)方法来实现我的意图?
更新
事实上,我能够通过以下方式在 Frame
对象中获得准确的 window 开始和结束偏移量
builder.stream("input-topic", Consumed.with(...))
.transformValues(EventTransformer::new)
.groupByKey()
.windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
.aggregate(Frame::new,
this::windowAggregator,
...
)
.toStream()
.selectKey((key, value) -> value...)
.to("output-topic", Produced.with(...));
但是如上所述,以编辑 Event
对象的架构为代价。
How can I get access to the record offset from the windowAggregator (aggregate() handler) function?
你不能。您在聚合之前使用 transformValues()
的方法(并丰富 Event
对象是正确的方法。
有人提议扩展 API 以允许在 aggregate()
和其他 DSL 运营商中访问记录元数据,但它从未被推过终点线(参见 https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams) .