Kafka Streams Windowing on a different value

Kafka Steams Windowing on a different value

当您使用 kafka 流创建 window 时,我假设它使用记录发布的时间戳? window 有什么办法吗?

我的用例是我们的记录值对象包含一个时间戳,这就是我们想要window的。

如果我这样做,它将 window 在发布的时间戳上。我想通过 myObject.getCallTimestamp()

window
KTable<Windowed<String>, MyObject> windowedPageViewCounts = pageViews
    .groupByKey(Serialized.with(Serdes.String(), myObjectSerde))
    .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5)))
    .count();

编辑:

根据下面的建议,我认为这是我需要做的?

public class RecordTimeStampExtractor implements TimestampExtractor {

    //default timestamp extractor
    private FailOnInvalidTimestamp failOnInvalidTimestamp = new FailOnInvalidTimestamp();

    @Override
    public long extract(ConsumerRecord<Object, Object> consumerRecord, long l) {
        //could also use consumerRecord.topic().equals("mytopic")
        if(consumerRecord.value() instanceof MyClass) {
            MyClass myClass = (MyClass) consumerRecord.value();
            return myClass.getRecordTimestamp().toEpochMilli();
        }
        return failOnInvalidTimestamp.extract(consumerRecord,l);
    }
}

您可以实施和配置(通过 default.timestamp.extractor)自定义 TimestampExtractor returns myObject.getCallTimestamp().

有关详细信息,请参阅文档:https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-timestamp-extractor