如何更改记录的时间戳?
How to change timestamp of records?
我正在使用 FluentD(v.12 最后一个稳定版本)向 Kafka 发送消息。但是 FluentD 使用旧的 KafkaProducer,因此记录时间戳始终设置为 -1。
因此,我必须使用 WallclockTimestampExtractor 将记录的时间戳设置为消息到达 kafka 的时间点。
是否有针对 Kafka Streams 的解决方案?
我真正感兴趣的时间戳由 fluentd 在消息中发送:
"timestamp":"1507885936","host":"V.X.Y.Z."
在kafka中记录表示:
offset = 0, timestamp= - 1, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}
我想在kafka中有这样一条记录:
offset = 0, timestamp= 1507885936, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}
我的解决方法如下:
写一个消费者提取时间戳(https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)
写一个生产者来生产一个带有时间戳集的新记录(ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
我更喜欢 KafkaStreams 解决方案,如果有的话。
您可以编写一个非常简单的 Kafka Streams 应用程序,例如:
KStreamBuilder builder = new KStreamBuilder();
builder.stream("input-topic").to("output-topic");
并使用从记录和 returns 中提取时间戳的自定义 TimestampExtractor
配置应用程序。
Kafka Streams 将在将记录写回 Kafka 时使用返回的时间戳。
Note: if you have out of order data -- ie, timestamps are not strictly ordered -- the result will contain out of order timestamps, too. Kafka Streams uses the returned timestamps to writing back to Kafka (ie, whatever the extractor returns, is used as record metadata timestamp). Note, that on write, the timestamp from the currently processed input record is used for all generated output records -- this hold for version 1.0 but might change in future releases.).
更新:
一般来说,您可以通过处理器修改时间戳API。调用 context.forward()
您可以通过 To.all().withTimestamp(...)
设置输出记录时间戳作为 forward()
.
的参数
我正在使用 FluentD(v.12 最后一个稳定版本)向 Kafka 发送消息。但是 FluentD 使用旧的 KafkaProducer,因此记录时间戳始终设置为 -1。 因此,我必须使用 WallclockTimestampExtractor 将记录的时间戳设置为消息到达 kafka 的时间点。
是否有针对 Kafka Streams 的解决方案?
我真正感兴趣的时间戳由 fluentd 在消息中发送:
"timestamp":"1507885936","host":"V.X.Y.Z."
在kafka中记录表示:
offset = 0, timestamp= - 1, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}
我想在kafka中有这样一条记录:
offset = 0, timestamp= 1507885936, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}
我的解决方法如下:
写一个消费者提取时间戳(https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)
写一个生产者来生产一个带有时间戳集的新记录(ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
我更喜欢 KafkaStreams 解决方案,如果有的话。
您可以编写一个非常简单的 Kafka Streams 应用程序,例如:
KStreamBuilder builder = new KStreamBuilder();
builder.stream("input-topic").to("output-topic");
并使用从记录和 returns 中提取时间戳的自定义 TimestampExtractor
配置应用程序。
Kafka Streams 将在将记录写回 Kafka 时使用返回的时间戳。
Note: if you have out of order data -- ie, timestamps are not strictly ordered -- the result will contain out of order timestamps, too. Kafka Streams uses the returned timestamps to writing back to Kafka (ie, whatever the extractor returns, is used as record metadata timestamp). Note, that on write, the timestamp from the currently processed input record is used for all generated output records -- this hold for version 1.0 but might change in future releases.).
更新:
一般来说,您可以通过处理器修改时间戳API。调用 context.forward()
您可以通过 To.all().withTimestamp(...)
设置输出记录时间戳作为 forward()
.