Kafka字数统计程序中如何修改KStream键值?
How to modify KStream key and values in Kafka word count program?
我是 Kafka Streams 的新手,有点卡在基本的字数统计程序中。在下面的程序中,我试图更改值的大小写,但它不起作用 (val wordCountInputProcessed = wordCountInput.mapValues(value => value.toLowerCase)
)。这里有什么问题吗?
kafka 流版本 => 2.3.0
Scala 版本 => 2.11.8
import java.util._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.streams.{KafkaStreams,StreamsBuilder, StreamsConfig}
import org.apache.kafka.common.serialization.{StringDeserializer,LongDeserializer}
object WordCount {
def main(args: Array[String]): Unit = {
val config = new Properties()
config.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-example")
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
config.put(ConsumerConfig.AutoOffsetReset,"earliest")
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[StringDeserializer])
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,classOf[StringDeserializer])
val builder = new StreamsBuilder
val wordCountInput = builder.stream[String,String]("streams-plaintext-input")
val wordCountInputProcessed = wordCountInput.mapValues(value => value.toLowerCase)
wordCountInputProcessed.to("streams-plaintext-output")
val streams = new KafkaStreams(builder.build(),config)
streams.start()
println(streams.toString)
}
}
这是这个问题的快照。
不应该是 String 而不是 Nothing 吗?
您必须将转换后的 KStream 重新分配给 KStream var wordCountInput
,否则 wordCountInput
仍会得到初始 KStream,如下所示:
wordCountInput = wordCountInput.mapValues(value => value.toLowerCase)
已更新
我做了一些其他更改,应用程序 运行 很好。
- Kafka Streams 使用
SerDes
class 环绕 StringSerializer/StringDeserializer, so change SERDES class config from
StringSerializer/StringDeserializerto
SerdeString`:
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[StringSerde])
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,classOf[StringSerde])
- 额外的提示,如果你将调试放入你的 Stream DSL 中以检查你是否收到新消息会更容易,我通常这样调试:
val wordCountInputProcessed = wordCountInput
.mapValues(value => {
println("origin " + value)
println("lowercase " + value.toLowerCase)
value.toLowerCase
})
你也可以把debug放在mapValues
.
里面
更新 1
更新完整的应用程序
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes.StringSerde
import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig}
object WordCount {
def main(args: Array[String]): Unit = {
val config = new Properties
config.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-example")
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest")
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[StringSerde])
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,classOf[StringSerde])
val builder = new StreamsBuilder
val wordCountInput = builder.stream[String,String]("streams-plaintext-input")
val wordCountInputProcessed = wordCountInput
.mapValues(value => {
println("origin " + value)
value.toLowerCase
})
wordCountInputProcessed.mapValues(value => {
println("lowercase " + value)
value
})
wordCountInputProcessed.to("streams-plaintext-output")
val streams = new KafkaStreams(builder.build(),config)
streams.start()
println(streams.toString)
}
}
我从 Java 更改为 scala API 的 Kafka 流 DSL,它解决了问题。出于各自的原因,我也在使用以下模块。
org.apache.kafka.streams.scala.ImplicitConversions
:将 Scala 和 Java类.
之间的隐式转换纳入范围的模块
org.apache.kafka.streams.scala.Serdes
:包含所有原始 SerDes 的模块,这些原始 SerDes 可以作为隐式导入,以及一个帮助程序来创建自定义 SerDes。
请参阅此文档了解更多详情(主题:KAFKA STREAMS DSL FOR SCALA)=> https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#scala-dsl
import java.time.Duration
import java.util._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import org.apache.kafka.streams.scala.StreamsBuilder
// Import for Scala DSL
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
object WordCount {
def main(args: Array[String]): Unit = {
val config = new Properties()
config.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-example")
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest")
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[Serdes.StringSerde])
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,classOf[Serdes.LongSerde])
val builder = new StreamsBuilder
val wordCountInput = builder.stream[String,String]("streams-plaintext-input")
val wordCountInputProcessed = wordCountInput.mapValues(value => value.toLowerCase())
.flatMapValues(x=>x.split(" "))
.selectKey((key,value) => value)
.groupByKey
.count
wordCountInputProcessed.toStream.to("streams-plaintext-output")
val streams = new KafkaStreams(builder.build(),config)
streams.start()
println(streams.toString)
sys.ShutdownHookThread {
streams.close(Duration.ofSeconds(10))
}
}
}
我是 Kafka Streams 的新手,有点卡在基本的字数统计程序中。在下面的程序中,我试图更改值的大小写,但它不起作用 (val wordCountInputProcessed = wordCountInput.mapValues(value => value.toLowerCase)
)。这里有什么问题吗?
kafka 流版本 => 2.3.0
Scala 版本 => 2.11.8
import java.util._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.streams.{KafkaStreams,StreamsBuilder, StreamsConfig}
import org.apache.kafka.common.serialization.{StringDeserializer,LongDeserializer}
object WordCount {
def main(args: Array[String]): Unit = {
val config = new Properties()
config.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-example")
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
config.put(ConsumerConfig.AutoOffsetReset,"earliest")
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[StringDeserializer])
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,classOf[StringDeserializer])
val builder = new StreamsBuilder
val wordCountInput = builder.stream[String,String]("streams-plaintext-input")
val wordCountInputProcessed = wordCountInput.mapValues(value => value.toLowerCase)
wordCountInputProcessed.to("streams-plaintext-output")
val streams = new KafkaStreams(builder.build(),config)
streams.start()
println(streams.toString)
}
}
这是这个问题的快照。
不应该是 String 而不是 Nothing 吗?
您必须将转换后的 KStream 重新分配给 KStream var wordCountInput
,否则 wordCountInput
仍会得到初始 KStream,如下所示:
wordCountInput = wordCountInput.mapValues(value => value.toLowerCase)
已更新
我做了一些其他更改,应用程序 运行 很好。
- Kafka Streams 使用
SerDes
class 环绕StringSerializer/StringDeserializer, so change SERDES class config from
StringSerializer/StringDeserializerto
SerdeString`:
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[StringSerde])
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,classOf[StringSerde])
- 额外的提示,如果你将调试放入你的 Stream DSL 中以检查你是否收到新消息会更容易,我通常这样调试:
val wordCountInputProcessed = wordCountInput
.mapValues(value => {
println("origin " + value)
println("lowercase " + value.toLowerCase)
value.toLowerCase
})
你也可以把debug放在mapValues
.
更新 1
更新完整的应用程序
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes.StringSerde
import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig}
object WordCount {
def main(args: Array[String]): Unit = {
val config = new Properties
config.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-example")
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest")
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[StringSerde])
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,classOf[StringSerde])
val builder = new StreamsBuilder
val wordCountInput = builder.stream[String,String]("streams-plaintext-input")
val wordCountInputProcessed = wordCountInput
.mapValues(value => {
println("origin " + value)
value.toLowerCase
})
wordCountInputProcessed.mapValues(value => {
println("lowercase " + value)
value
})
wordCountInputProcessed.to("streams-plaintext-output")
val streams = new KafkaStreams(builder.build(),config)
streams.start()
println(streams.toString)
}
}
我从 Java 更改为 scala API 的 Kafka 流 DSL,它解决了问题。出于各自的原因,我也在使用以下模块。
org.apache.kafka.streams.scala.ImplicitConversions
:将 Scala 和 Java类.
org.apache.kafka.streams.scala.Serdes
:包含所有原始 SerDes 的模块,这些原始 SerDes 可以作为隐式导入,以及一个帮助程序来创建自定义 SerDes。
请参阅此文档了解更多详情(主题:KAFKA STREAMS DSL FOR SCALA)=> https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#scala-dsl
import java.time.Duration
import java.util._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import org.apache.kafka.streams.scala.StreamsBuilder
// Import for Scala DSL
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
object WordCount {
def main(args: Array[String]): Unit = {
val config = new Properties()
config.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-example")
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest")
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[Serdes.StringSerde])
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,classOf[Serdes.LongSerde])
val builder = new StreamsBuilder
val wordCountInput = builder.stream[String,String]("streams-plaintext-input")
val wordCountInputProcessed = wordCountInput.mapValues(value => value.toLowerCase())
.flatMapValues(x=>x.split(" "))
.selectKey((key,value) => value)
.groupByKey
.count
wordCountInputProcessed.toStream.to("streams-plaintext-output")
val streams = new KafkaStreams(builder.build(),config)
streams.start()
println(streams.toString)
sys.ShutdownHookThread {
streams.close(Duration.ofSeconds(10))
}
}
}