如何在 Kafka 上使用 flatMapValues
How to use flatMapValues on Kafka
在带有 Kafka 库的 Scala 中使用 flatMapValues 时出现错误。这是我的代码:
val builder: KStreamBuilder = new KStreamBuilder()
val textLines: KStream[String, String] = builder.stream("streams-plaintext-input")
import collection.JavaConverters.asJavaIterableConverter
val wordCounts: KTable[String, JLong] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count("word-counts")
我在 flatMapValues
中收到 textLine
的错误 missing parameter type
。如果我替换 flatMapValues((textLine: String) => textLine.toLowerCase.split("\W+").toIterable.asJava)
它仍然不起作用。
有人知道吗?
谢谢,费利佩
使用 Scala 2.12.4 我这样解决了:
val props = new Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
val stringSerde: Serde[String] = Serdes.String()
val longSerde: Serde[Long] = Serdes.Long()
val builder = new StreamsBuilder()
val textLines: KStream[String, String] = builder.stream("streams-plaintext-input")
val topology: Topology = builder.build()
println(topology.describe())
val wordCounts: KTable[String, Long] = textLines
.flatMapValues { textLine =>
println(textLine)
println(topology.describe())
textLine.toLowerCase.split("\W+").toIterable.asJava
}
.groupBy((_, word) => word)
// this is a stateful computation config to the topology
.count("word-counts")
wordCounts.to(stringSerde, longSerde, "streams-wordcount-output")
val streams = new KafkaStreams(topology, props)
streams.start()
在带有 Kafka 库的 Scala 中使用 flatMapValues 时出现错误。这是我的代码:
val builder: KStreamBuilder = new KStreamBuilder()
val textLines: KStream[String, String] = builder.stream("streams-plaintext-input")
import collection.JavaConverters.asJavaIterableConverter
val wordCounts: KTable[String, JLong] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count("word-counts")
我在 flatMapValues
中收到 textLine
的错误 missing parameter type
。如果我替换 flatMapValues((textLine: String) => textLine.toLowerCase.split("\W+").toIterable.asJava)
它仍然不起作用。
有人知道吗? 谢谢,费利佩
使用 Scala 2.12.4 我这样解决了:
val props = new Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
val stringSerde: Serde[String] = Serdes.String()
val longSerde: Serde[Long] = Serdes.Long()
val builder = new StreamsBuilder()
val textLines: KStream[String, String] = builder.stream("streams-plaintext-input")
val topology: Topology = builder.build()
println(topology.describe())
val wordCounts: KTable[String, Long] = textLines
.flatMapValues { textLine =>
println(textLine)
println(topology.describe())
textLine.toLowerCase.split("\W+").toIterable.asJava
}
.groupBy((_, word) => word)
// this is a stateful computation config to the topology
.count("word-counts")
wordCounts.to(stringSerde, longSerde, "streams-wordcount-output")
val streams = new KafkaStreams(topology, props)
streams.start()