使用 Kafka Streams 提取 Seq
Extract Seq using Kafka Streams
我正在尝试阅读 Kafka 主题,根据该主题进行一些处理,然后将结果存储在另一个主题中。
我的代码如下所示:
builder
.stream(settings.Streams.inputTopic)
.mapValues[Seq[Product]]((e: EventRecord) ⇒ fx(e))
// Something needs to be done here...
.to(settings.Streams.outputTopic)
fx(e)
函数进行一些处理并且 return 是一个 Seq[Product]
。我想将所有产品存储为主题中的单独条目。问题是从主题中读取的消息包含多个产品,因此 fx(e)
的 return 值。
是否可以将此行为嵌入到流中?
使用flatMapValues
代替mapValues
:
import scala.collection.JavaConverters.asJavaIterableConverter
builder
.stream(settings.Streams.inputTopic)
.flatMapValues(e => fx(e).toIterable.asJava)
.to(settings.Streams.outputTopic)
我正在尝试阅读 Kafka 主题,根据该主题进行一些处理,然后将结果存储在另一个主题中。
我的代码如下所示:
builder
.stream(settings.Streams.inputTopic)
.mapValues[Seq[Product]]((e: EventRecord) ⇒ fx(e))
// Something needs to be done here...
.to(settings.Streams.outputTopic)
fx(e)
函数进行一些处理并且 return 是一个 Seq[Product]
。我想将所有产品存储为主题中的单独条目。问题是从主题中读取的消息包含多个产品,因此 fx(e)
的 return 值。
是否可以将此行为嵌入到流中?
使用flatMapValues
代替mapValues
:
import scala.collection.JavaConverters.asJavaIterableConverter
builder
.stream(settings.Streams.inputTopic)
.flatMapValues(e => fx(e).toIterable.asJava)
.to(settings.Streams.outputTopic)