Kafka Streams Windowing on a different value
Kafka Steams Windowing on a different value
当您使用 kafka 流创建 window 时,我假设它使用记录发布的时间戳? window 有什么办法吗?
我的用例是我们的记录值对象包含一个时间戳,这就是我们想要window的。
如果我这样做,它将 window 在发布的时间戳上。我想通过 myObject.getCallTimestamp()
window
KTable<Windowed<String>, MyObject> windowedPageViewCounts = pageViews
.groupByKey(Serialized.with(Serdes.String(), myObjectSerde))
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5)))
.count();
编辑:
根据下面的建议,我认为这是我需要做的?
public class RecordTimeStampExtractor implements TimestampExtractor {
//default timestamp extractor
private FailOnInvalidTimestamp failOnInvalidTimestamp = new FailOnInvalidTimestamp();
@Override
public long extract(ConsumerRecord<Object, Object> consumerRecord, long l) {
//could also use consumerRecord.topic().equals("mytopic")
if(consumerRecord.value() instanceof MyClass) {
MyClass myClass = (MyClass) consumerRecord.value();
return myClass.getRecordTimestamp().toEpochMilli();
}
return failOnInvalidTimestamp.extract(consumerRecord,l);
}
}
您可以实施和配置(通过 default.timestamp.extractor
)自定义 TimestampExtractor
returns myObject.getCallTimestamp()
.
当您使用 kafka 流创建 window 时,我假设它使用记录发布的时间戳? window 有什么办法吗?
我的用例是我们的记录值对象包含一个时间戳,这就是我们想要window的。
如果我这样做,它将 window 在发布的时间戳上。我想通过 myObject.getCallTimestamp()
windowKTable<Windowed<String>, MyObject> windowedPageViewCounts = pageViews
.groupByKey(Serialized.with(Serdes.String(), myObjectSerde))
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5)))
.count();
编辑:
根据下面的建议,我认为这是我需要做的?
public class RecordTimeStampExtractor implements TimestampExtractor {
//default timestamp extractor
private FailOnInvalidTimestamp failOnInvalidTimestamp = new FailOnInvalidTimestamp();
@Override
public long extract(ConsumerRecord<Object, Object> consumerRecord, long l) {
//could also use consumerRecord.topic().equals("mytopic")
if(consumerRecord.value() instanceof MyClass) {
MyClass myClass = (MyClass) consumerRecord.value();
return myClass.getRecordTimestamp().toEpochMilli();
}
return failOnInvalidTimestamp.extract(consumerRecord,l);
}
}
您可以实施和配置(通过 default.timestamp.extractor
)自定义 TimestampExtractor
returns myObject.getCallTimestamp()
.