在 Spark Streaming 中反序列化来自 Kafka 的 Avro 格式数据给出空字符串和 0 for long
Deserialising Avro formatted data from Kafka in Spark Streaming gives empty String and 0 for long
我正在努力反序列化来自 Spark Streaming 中 Kafka 的 Avro 序列化数据。
这是我通过 spark-submit 运行 得到的文件:
package com.example.mymessage
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.io.DecoderFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object MyMessageCount extends Logging {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: MyMessageCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
logInfo("Setting log level to [WARN]." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setMaster("local[4]").setAppName("MyMessageCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.foreachRDD(rdd => {
rdd.foreach(avroRecord => {
val schemaString = "{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"string\",\"type\":\"string\"},{\"name\":\"long\",\"type\":\"long\"}]}"
val parser = new Schema.Parser()
val schema = parser.parse(schemaString)
val reader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get.binaryDecoder(avroRecord.toCharArray.map(_.toByte), null)
val record: GenericRecord = reader.read(null, decoder)
System.out.println(avroRecord + "," + record.toString
+ ", string= " + record.get("string")
+ ", long=" + record.get("long"))
})
})
ssc.start()
ssc.awaitTermination()
}
}
我一直在用Confluent平台在本地给它发数据。
如果我发送:
{"string":"test","long":30}
那么上面的代码输出:
test<,{"string": "", "long": 0}, string= , long=0
这向我表明数据正在通过,但由于某种原因,字符串和长值作为看起来像默认值的值出现。如何从 Kafka 访问进入 avroRecord
的真实 "string" 和 "long" 值?
将 Confluent 的 KafkaAvroDecoder 与直接流一起使用可以解决此问题。
import io.confluent.kafka.serializers.KafkaAvroDecoder
...
val kafkaParams = Map[String, String]("metadata.broker.list" -> zkQuorum,
"schema.registry.url" -> schemaRegistry,
"auto.offset.reset" -> "smallest")
val topicSet = Set(topics)
val messages = KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicSet).map(_._2)
val lines = messages.foreachRDD(rdd => {
rdd.foreach({ avroRecord =>
println(avroRecord)
})
})
我发现了一个单独的问题,即我只能导入版本 1 而不能导入更新的版本。
libraryDependencies ++= Seq(
"io.confluent" % "kafka-avro-serializer" % "1.0",
...
)
resolvers ++= Seq(
Resolver.sonatypeRepo("public"),
Resolver.url("confluent", url("http://packages.confluent.io/maven/"))
)
UPDATE 以下内容用于获取最新版本的 kafka-avro-serializer。
libraryDependencies ++= Seq(
"io.confluent" % "kafka-avro-serializer" % "3.0.0",
...
)
resolvers ++= Seq(
Resolver.sonatypeRepo("public"),
"Confluent Maven Repo" at "http://packages.confluent.io/maven/"
)
我正在努力反序列化来自 Spark Streaming 中 Kafka 的 Avro 序列化数据。
这是我通过 spark-submit 运行 得到的文件:
package com.example.mymessage
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.io.DecoderFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object MyMessageCount extends Logging {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: MyMessageCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
logInfo("Setting log level to [WARN]." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setMaster("local[4]").setAppName("MyMessageCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.foreachRDD(rdd => {
rdd.foreach(avroRecord => {
val schemaString = "{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"string\",\"type\":\"string\"},{\"name\":\"long\",\"type\":\"long\"}]}"
val parser = new Schema.Parser()
val schema = parser.parse(schemaString)
val reader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get.binaryDecoder(avroRecord.toCharArray.map(_.toByte), null)
val record: GenericRecord = reader.read(null, decoder)
System.out.println(avroRecord + "," + record.toString
+ ", string= " + record.get("string")
+ ", long=" + record.get("long"))
})
})
ssc.start()
ssc.awaitTermination()
}
}
我一直在用Confluent平台在本地给它发数据。
如果我发送:
{"string":"test","long":30}
那么上面的代码输出:
test<,{"string": "", "long": 0}, string= , long=0
这向我表明数据正在通过,但由于某种原因,字符串和长值作为看起来像默认值的值出现。如何从 Kafka 访问进入 avroRecord
的真实 "string" 和 "long" 值?
将 Confluent 的 KafkaAvroDecoder 与直接流一起使用可以解决此问题。
import io.confluent.kafka.serializers.KafkaAvroDecoder
...
val kafkaParams = Map[String, String]("metadata.broker.list" -> zkQuorum,
"schema.registry.url" -> schemaRegistry,
"auto.offset.reset" -> "smallest")
val topicSet = Set(topics)
val messages = KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicSet).map(_._2)
val lines = messages.foreachRDD(rdd => {
rdd.foreach({ avroRecord =>
println(avroRecord)
})
})
我发现了一个单独的问题,即我只能导入版本 1 而不能导入更新的版本。
libraryDependencies ++= Seq(
"io.confluent" % "kafka-avro-serializer" % "1.0",
...
)
resolvers ++= Seq(
Resolver.sonatypeRepo("public"),
Resolver.url("confluent", url("http://packages.confluent.io/maven/"))
)
UPDATE 以下内容用于获取最新版本的 kafka-avro-serializer。
libraryDependencies ++= Seq(
"io.confluent" % "kafka-avro-serializer" % "3.0.0",
...
)
resolvers ++= Seq(
Resolver.sonatypeRepo("public"),
"Confluent Maven Repo" at "http://packages.confluent.io/maven/"
)