访问 Kafka Streams 中聚合器内的 TimeWindow 属性

Access TimeWindow properties inside an aggregator in Kafka Streams

我想用 Kafka-Streams 在一段时间内流式传输主题的最新记录 window,我想将输出记录的时间戳设置为等于时间的结束 window 记录已注册。

我的问题是我无法在聚合器内部访问 window 属性。

这是我现在拥有的代码:

    KS0
        .groupByKey()
        .windowedBy(
            TimeWindows.of(Duration.ofSeconds(this.periodicity)).grace(Duration.ZERO)
        )
        .aggregate(
            Constants::getInitialAssetTimeValue,
            this::aggregator,
            Materialized.<AssetKey, AssetTimeValue, WindowStore<Bytes, byte[]>>as(this.getStoreName()) /* state store name */
                .withValueSerde(assetTimeValueSerde)   /* serde for aggregate value */
        )
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
        .toStream()
        .peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
        .to(this.toTopic);

而我使用的聚合函数是这个:

private AssetTimeValue aggregator(AssetKey aggKey, AssetTimeValue newValue, AssetTimeValue aggValue){

 // I want to do something like that, but this only works with windowed Keys to which I do  
 // not have access through the aggregator
 // windowEndTime = aggKey.window().endTime().getEpochSecond();   

    return AssetTimeValue.newBuilder()
            .setTimestamp(windowEndTime)
            .setName(newValue.getName())
            .setValue(newValue.getValue())
            .build();
}

非常感谢您的帮助!

您只能通过处理器 API 操作时间戳。但是,您可以轻松使用 DSL 中嵌入的处理器 API。

对于您的情况,您可以在 toStream()to() 之间插入一个 transform()。在 Transformer 中调用 context.forward(key, value, To.all().withTimestamp(...)) 来设置新的时间戳。此外,你会在最后 return nullnull 意味着不发出任何记录,因为你已经为此目的使用了 context.forward)。