如何在 Kafka 上使用 flatMapValues

How to use flatMapValues on Kafka

在带有 Kafka 库的 Scala 中使用 flatMapValues 时出现错误。这是我的代码:

val builder: KStreamBuilder = new KStreamBuilder()
val textLines: KStream[String, String] = builder.stream("streams-plaintext-input")
import collection.JavaConverters.asJavaIterableConverter
val wordCounts: KTable[String, JLong] = textLines
  .flatMapValues(textLine => textLine.toLowerCase.split("\W+").toIterable.asJava)
  .groupBy((_, word) => word)
  .count("word-counts")

我在 flatMapValues 中收到 textLine 的错误 missing parameter type。如果我替换 flatMapValues((textLine: String) => textLine.toLowerCase.split("\W+").toIterable.asJava) 它仍然不起作用。

有人知道吗? 谢谢,费利佩

使用 Scala 2.12.4 我这样解决了:

  val props = new Properties
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount")
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
  props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)

  val stringSerde: Serde[String] = Serdes.String()
  val longSerde: Serde[Long] = Serdes.Long()

  val builder = new StreamsBuilder()
  val textLines: KStream[String, String] = builder.stream("streams-plaintext-input")

  val topology: Topology = builder.build()

  println(topology.describe())

  val wordCounts: KTable[String, Long] = textLines
    .flatMapValues { textLine =>
      println(textLine)
      println(topology.describe())
      textLine.toLowerCase.split("\W+").toIterable.asJava
    }
    .groupBy((_, word) => word)
    // this is a stateful computation config to the topology
    .count("word-counts")

  wordCounts.to(stringSerde, longSerde, "streams-wordcount-output")

  val streams = new KafkaStreams(topology, props)
  streams.start()