使用 Kafka Streams 提取 Seq

Extract Seq using Kafka Streams

我正在尝试阅读 Kafka 主题,根据该主题进行一些处理,然后将结果存储在另一个主题中。

我的代码如下所示:

builder
   .stream(settings.Streams.inputTopic)
   .mapValues[Seq[Product]]((e: EventRecord) ⇒ fx(e))
   // Something needs to be done here...
   .to(settings.Streams.outputTopic)

fx(e) 函数进行一些处理并且 return 是一个 Seq[Product]。我想将所有产品存储为主题中的单独条目。问题是从主题中读取的消息包含多个产品,因此 fx(e) 的 return 值。

是否可以将此行为嵌入到流中?

使用flatMapValues代替mapValues

import scala.collection.JavaConverters.asJavaIterableConverter

builder
  .stream(settings.Streams.inputTopic)
  .flatMapValues(e => fx(e).toIterable.asJava)
  .to(settings.Streams.outputTopic)