使用 Kafka Streams 在输出中设置时间戳
Set timestamp in output with Kafka Streams
我在 Kafka 主题 "raw-data" 中获取 CSV,目标是通过在另一个主题 "data" 中发送具有正确时间戳(每行不同)的每一行来转换它们。
目前,我有 2 个主播:
- 一个拆分 "raw-data" 中的行,将它们发送到 "internal" 主题(无时间戳)
- 一个
TimestampExtractor
消耗 "internal" 并将它们发送到 "data"。
我想通过直接设置时间戳来删除这个 "internal" 主题的使用,但我找不到办法(时间戳提取器仅在消费时使用)。
我在文档中偶然发现了这一行:
Note, that the describe default behavior can be changed in the Processor API by assigning timestamps to output records explicitly when calling #forward().
但我找不到任何带有时间戳的签名。它们是什么意思?
你会怎么做?
编辑:
明确地说,我有一个 Kafka 主题,其中一条消息包含事件时间和一些值,例如:
2018-01-01,hello
2018-01-02,world
(这是一条消息,不是两条)
我想在另一个主题中获取两条消息,并将 Kafka 记录时间戳设置为其事件时间(2018-01-01 和 2018-01-02),而不需要中间主题。
设置输出的时间戳需要 Kafka Streams 2.0,并且仅在处理器 API 中受支持。如果您使用 DSL,则可以使用 transform()
来使用那些 API。
正如您所指出的,您将使用 context.forward()
。调用将是:
stream.transform(new TransformerSupplier() {
public Transformer get() {
return new Transformer() {
// omit other methods for brevity
// you need to get the `context` from `init()`
public KeyValue transform(K key, V value) {
// some business logic
// you can call #forward() as often as you want
context.forward(newKey, newValue, To.all().withTimestamp(newTimestamp));
return null; // only return data via context#forward()
}
}
}
});
我在 Kafka 主题 "raw-data" 中获取 CSV,目标是通过在另一个主题 "data" 中发送具有正确时间戳(每行不同)的每一行来转换它们。
目前,我有 2 个主播:
- 一个拆分 "raw-data" 中的行,将它们发送到 "internal" 主题(无时间戳)
- 一个
TimestampExtractor
消耗 "internal" 并将它们发送到 "data"。
我想通过直接设置时间戳来删除这个 "internal" 主题的使用,但我找不到办法(时间戳提取器仅在消费时使用)。
我在文档中偶然发现了这一行:
Note, that the describe default behavior can be changed in the Processor API by assigning timestamps to output records explicitly when calling #forward().
但我找不到任何带有时间戳的签名。它们是什么意思?
你会怎么做?
编辑: 明确地说,我有一个 Kafka 主题,其中一条消息包含事件时间和一些值,例如:
2018-01-01,hello
2018-01-02,world
(这是一条消息,不是两条)
我想在另一个主题中获取两条消息,并将 Kafka 记录时间戳设置为其事件时间(2018-01-01 和 2018-01-02),而不需要中间主题。
设置输出的时间戳需要 Kafka Streams 2.0,并且仅在处理器 API 中受支持。如果您使用 DSL,则可以使用 transform()
来使用那些 API。
正如您所指出的,您将使用 context.forward()
。调用将是:
stream.transform(new TransformerSupplier() {
public Transformer get() {
return new Transformer() {
// omit other methods for brevity
// you need to get the `context` from `init()`
public KeyValue transform(K key, V value) {
// some business logic
// you can call #forward() as often as you want
context.forward(newKey, newValue, To.all().withTimestamp(newTimestamp));
return null; // only return data via context#forward()
}
}
}
});