无法使用 spark 结构化流反序列化 avro 消息,其中键为字符串序列化且值为 avro
Unable to deserialize avro message using spark structured stream where key is string serialized and value is avro
使用 Spark 2.4.0
Confluent schema-Registry 接收 schema
消息 Key 在 String 中序列化,Value 在 Avro 中序列化,因此我试图反序列化 Value 使用 io.confluent.kafka.serializers.KafkaAvroDeserializer,但它不起作用。任何人都可以查看我的代码看看有什么问题吗
导入的库:
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.Deserializer
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{ Encoder, SparkSession}
代码体
val topics = "test_topic"
val spark: SparkSession = SparkSession.builder
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.streaming.backpressure.enabled", "true")
.config("spark.streaming.kafka.maxRatePerPartition", 2170)
.config("spark.streaming.kafka.maxRetries", 1)
.config("spark.streaming.kafka.consumer.poll.ms", "600000")
.appName("SparkStructuredStreamAvro")
.config("spark.sql.streaming.checkpointLocation", "/tmp/new_checkpoint/")
.enableHiveSupport()
.getOrCreate
//add settings for schema registry url, used to get deser
val schemaRegUrl = "http://xx.xx.xx.xxx:xxxx"
val client = new CachedSchemaRegistryClient(schemaRegUrl, 100)
//subscribe to kafka
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xx.xx.xxxx")
.option("subscribe", "test.topic")
.option("kafka.startingOffsets", "latest")
.option("group.id", "use_a_separate_group_id_for_each_stream")
.load()
//add confluent kafka avro deserializer, needed to read messages appropriately
val deser = new KafkaAvroDeserializer(client).asInstanceOf[Deserializer[GenericRecord]]
//needed to convert column select into Array[Bytes]
import spark.implicits._
val results = df.select(col("value").as[Array[Byte]]).map { rawBytes: Array[Byte] =>
//read the raw bytes from spark and then use the confluent deserializer to get the record back
val decoded = deser.deserialize(topics, rawBytes)
val recordId = decoded.get("nameId").asInstanceOf[org.apache.avro.util.Utf8].toString
recordId
}
results.writeStream
.outputMode("append")
.format("text")
.option("path", "/tmp/path_new/")
.option("truncate", "false")
.start()
.awaitTermination()
spark.stop()
反序列化失败,Error Received为
Caused by: java.io.NotSerializableException: io.confluent.kafka.serializers.KafkaAvroDeserializer
Serialization stack:
- object not serializable (class: io.confluent.kafka.serializers.KafkaAvroDeserializer, value: io.confluent.kafka.serializers.KafkaAvroDeserializer@591024db)
- field (class: ca.bell.wireless.ingest$$anonfun, name: deser, type: interface org.apache.kafka.common.serialization.Deserializer)
- object (class ca.bell.wireless.ingest$$anonfun, <function1>)
- element of array (index: 1)
当我使用
编写普通的 kafka 消费者(不是通过 spark)时,它工作得很好
props.put("key.deserializer", classOf[StringDeserializer])
props.put("value.deserializer", classOf[KafkaAvroDeserializer])
您在地图块外为 KafkaAvroDeserializer 定义了变量('deser')。
它使那个例外。
尝试像这样更改代码:
val brdDeser = spark.sparkContext.broadcast(new KafkaAvroDeserializer(client).asInstanceOf[Deserializer[GenericRecord]])
val results = df.select(col("value").as[Array[Byte]]).map { rawBytes: Array[Byte] =>
val deser = brdDeser.value
val decoded = deser.deserialize(topics, rawBytes)
val recordId = decoded.get("nameId").asInstanceOf[org.apache.avro.util.Utf8].toString
recordId
}
使用 Spark 2.4.0
Confluent schema-Registry 接收 schema
消息 Key 在 String 中序列化,Value 在 Avro 中序列化,因此我试图反序列化 Value 使用 io.confluent.kafka.serializers.KafkaAvroDeserializer,但它不起作用。任何人都可以查看我的代码看看有什么问题吗
导入的库:
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.Deserializer
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{ Encoder, SparkSession}
代码体
val topics = "test_topic"
val spark: SparkSession = SparkSession.builder
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.streaming.backpressure.enabled", "true")
.config("spark.streaming.kafka.maxRatePerPartition", 2170)
.config("spark.streaming.kafka.maxRetries", 1)
.config("spark.streaming.kafka.consumer.poll.ms", "600000")
.appName("SparkStructuredStreamAvro")
.config("spark.sql.streaming.checkpointLocation", "/tmp/new_checkpoint/")
.enableHiveSupport()
.getOrCreate
//add settings for schema registry url, used to get deser
val schemaRegUrl = "http://xx.xx.xx.xxx:xxxx"
val client = new CachedSchemaRegistryClient(schemaRegUrl, 100)
//subscribe to kafka
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xx.xx.xxxx")
.option("subscribe", "test.topic")
.option("kafka.startingOffsets", "latest")
.option("group.id", "use_a_separate_group_id_for_each_stream")
.load()
//add confluent kafka avro deserializer, needed to read messages appropriately
val deser = new KafkaAvroDeserializer(client).asInstanceOf[Deserializer[GenericRecord]]
//needed to convert column select into Array[Bytes]
import spark.implicits._
val results = df.select(col("value").as[Array[Byte]]).map { rawBytes: Array[Byte] =>
//read the raw bytes from spark and then use the confluent deserializer to get the record back
val decoded = deser.deserialize(topics, rawBytes)
val recordId = decoded.get("nameId").asInstanceOf[org.apache.avro.util.Utf8].toString
recordId
}
results.writeStream
.outputMode("append")
.format("text")
.option("path", "/tmp/path_new/")
.option("truncate", "false")
.start()
.awaitTermination()
spark.stop()
反序列化失败,Error Received为
Caused by: java.io.NotSerializableException: io.confluent.kafka.serializers.KafkaAvroDeserializer
Serialization stack:
- object not serializable (class: io.confluent.kafka.serializers.KafkaAvroDeserializer, value: io.confluent.kafka.serializers.KafkaAvroDeserializer@591024db)
- field (class: ca.bell.wireless.ingest$$anonfun, name: deser, type: interface org.apache.kafka.common.serialization.Deserializer)
- object (class ca.bell.wireless.ingest$$anonfun, <function1>)
- element of array (index: 1)
当我使用
编写普通的 kafka 消费者(不是通过 spark)时,它工作得很好 props.put("key.deserializer", classOf[StringDeserializer])
props.put("value.deserializer", classOf[KafkaAvroDeserializer])
您在地图块外为 KafkaAvroDeserializer 定义了变量('deser')。 它使那个例外。
尝试像这样更改代码:
val brdDeser = spark.sparkContext.broadcast(new KafkaAvroDeserializer(client).asInstanceOf[Deserializer[GenericRecord]])
val results = df.select(col("value").as[Array[Byte]]).map { rawBytes: Array[Byte] =>
val deser = brdDeser.value
val decoded = deser.deserialize(topics, rawBytes)
val recordId = decoded.get("nameId").asInstanceOf[org.apache.avro.util.Utf8].toString
recordId
}