使用 Kafka Streams 在输出中设置时间戳

Set timestamp in output with Kafka Streams

我在 Kafka 主题 "raw-data" 中获取 CSV,目标是通过在另一个主题 "data" 中发送具有正确时间戳(每行不同)的每一行来转换它们。

目前,我有 2 个主播:

我想通过直接设置时间戳来删除这个 "internal" 主题的使用,但我找不到办法(时间戳提取器仅在消费时使用)。

我在文档中偶然发现了这一行:

Note, that the describe default behavior can be changed in the Processor API by assigning timestamps to output records explicitly when calling #forward().

但我找不到任何带有时间戳的签名。它们是什么意思?

你会怎么做?

编辑: 明确地说,我有一个 Kafka 主题,其中一条消息包含事件时间和一些值,例如:

2018-01-01,hello 2018-01-02,world (这是一条消息,不是两条)

我想在另一个主题中获取两条消息,并将 Kafka 记录时间戳设置为其事件时间(2018-01-01 和 2018-01-02),而不需要中间主题。

设置输出的时间戳需要 Kafka Streams 2.0,并且仅在处理器 API 中受支持。如果您使用 DSL,则可以使用 transform() 来使用那些 API。

正如您所指出的,您将使用 context.forward()。调用将是:

stream.transform(new TransformerSupplier() {
  public Transformer get() {
    return new Transformer() {
      // omit other methods for brevity
      // you need to get the `context` from `init()`

      public KeyValue transform(K key, V value) {
        // some business logic

        // you can call #forward() as often as you want
        context.forward(newKey, newValue, To.all().withTimestamp(newTimestamp));

        return null; // only return data via context#forward()
      }
    }
  }
});