访问 Kafka Streams 中聚合器内的 TimeWindow 属性
Access TimeWindow properties inside an aggregator in Kafka Streams
我想用 Kafka-Streams 在一段时间内流式传输主题的最新记录 window,我想将输出记录的时间戳设置为等于时间的结束 window 记录已注册。
我的问题是我无法在聚合器内部访问 window 属性。
这是我现在拥有的代码:
KS0
.groupByKey()
.windowedBy(
TimeWindows.of(Duration.ofSeconds(this.periodicity)).grace(Duration.ZERO)
)
.aggregate(
Constants::getInitialAssetTimeValue,
this::aggregator,
Materialized.<AssetKey, AssetTimeValue, WindowStore<Bytes, byte[]>>as(this.getStoreName()) /* state store name */
.withValueSerde(assetTimeValueSerde) /* serde for aggregate value */
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
.to(this.toTopic);
而我使用的聚合函数是这个:
private AssetTimeValue aggregator(AssetKey aggKey, AssetTimeValue newValue, AssetTimeValue aggValue){
// I want to do something like that, but this only works with windowed Keys to which I do
// not have access through the aggregator
// windowEndTime = aggKey.window().endTime().getEpochSecond();
return AssetTimeValue.newBuilder()
.setTimestamp(windowEndTime)
.setName(newValue.getName())
.setValue(newValue.getValue())
.build();
}
非常感谢您的帮助!
您只能通过处理器 API 操作时间戳。但是,您可以轻松使用 DSL 中嵌入的处理器 API。
对于您的情况,您可以在 toStream()
和 to()
之间插入一个 transform()
。在 Transformer
中调用 context.forward(key, value, To.all().withTimestamp(...))
来设置新的时间戳。此外,你会在最后 return null
(null
意味着不发出任何记录,因为你已经为此目的使用了 context.forward
)。
我想用 Kafka-Streams 在一段时间内流式传输主题的最新记录 window,我想将输出记录的时间戳设置为等于时间的结束 window 记录已注册。
我的问题是我无法在聚合器内部访问 window 属性。
这是我现在拥有的代码:
KS0
.groupByKey()
.windowedBy(
TimeWindows.of(Duration.ofSeconds(this.periodicity)).grace(Duration.ZERO)
)
.aggregate(
Constants::getInitialAssetTimeValue,
this::aggregator,
Materialized.<AssetKey, AssetTimeValue, WindowStore<Bytes, byte[]>>as(this.getStoreName()) /* state store name */
.withValueSerde(assetTimeValueSerde) /* serde for aggregate value */
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
.to(this.toTopic);
而我使用的聚合函数是这个:
private AssetTimeValue aggregator(AssetKey aggKey, AssetTimeValue newValue, AssetTimeValue aggValue){
// I want to do something like that, but this only works with windowed Keys to which I do
// not have access through the aggregator
// windowEndTime = aggKey.window().endTime().getEpochSecond();
return AssetTimeValue.newBuilder()
.setTimestamp(windowEndTime)
.setName(newValue.getName())
.setValue(newValue.getValue())
.build();
}
非常感谢您的帮助!
您只能通过处理器 API 操作时间戳。但是,您可以轻松使用 DSL 中嵌入的处理器 API。
对于您的情况,您可以在 toStream()
和 to()
之间插入一个 transform()
。在 Transformer
中调用 context.forward(key, value, To.all().withTimestamp(...))
来设置新的时间戳。此外,你会在最后 return null
(null
意味着不发出任何记录,因为你已经为此目的使用了 context.forward
)。