如何将字节从 Kafka 转换为其原始对象?
How to convert bytes from Kafka to their original object?
我正在从 Kafka 获取数据,然后使用默认解码器反序列化 Array[Byte]
,之后我的 RDD 元素看起来像 (null,[B@406fa9b2)
、(null,[B@21a9fe0)
但我想要我的原始数据有一个模式,那么我该如何实现呢?
我以 Avro 格式序列化消息。
您必须使用适当的反序列化器对字节进行解码,比如字符串或您的自定义对象。
如果你不进行解码,你会得到 [B@406fa9b2
那只是 Java.
中字节数组的文本表示
Kafka 对您的消息内容一无所知,因此它将字节数组从生产者传递给消费者。
在 Spark Streaming 中,您必须对键和值使用序列化程序(引用 KafkaWordCount example):
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
使用上面的序列化程序你会得到 DStream[String]
所以你可以使用 RDD[String]
.
但是,如果您想将字节数组反序列化为您的自定义 class 直接,您必须编写一个自定义 Serializer(这是 Kafka 特定的,与 Spark 无关)。
我推荐的是将 JSON 与固定模式或 Avro(使用 Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages 中描述的解决方案)一起使用。
在 Structured Streaming 中,管道可能如下所示:
val fromKafka = spark.
readStream.
format("kafka").
option("subscribe", "topic1").
option("kafka.bootstrap.servers", "localhost:9092").
load.
select('value cast "string") // <-- conversion here
我正在从 Kafka 获取数据,然后使用默认解码器反序列化 Array[Byte]
,之后我的 RDD 元素看起来像 (null,[B@406fa9b2)
、(null,[B@21a9fe0)
但我想要我的原始数据有一个模式,那么我该如何实现呢?
我以 Avro 格式序列化消息。
您必须使用适当的反序列化器对字节进行解码,比如字符串或您的自定义对象。
如果你不进行解码,你会得到 [B@406fa9b2
那只是 Java.
Kafka 对您的消息内容一无所知,因此它将字节数组从生产者传递给消费者。
在 Spark Streaming 中,您必须对键和值使用序列化程序(引用 KafkaWordCount example):
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
使用上面的序列化程序你会得到 DStream[String]
所以你可以使用 RDD[String]
.
但是,如果您想将字节数组反序列化为您的自定义 class 直接,您必须编写一个自定义 Serializer(这是 Kafka 特定的,与 Spark 无关)。
我推荐的是将 JSON 与固定模式或 Avro(使用 Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages 中描述的解决方案)一起使用。
在 Structured Streaming 中,管道可能如下所示:
val fromKafka = spark.
readStream.
format("kafka").
option("subscribe", "topic1").
option("kafka.bootstrap.servers", "localhost:9092").
load.
select('value cast "string") // <-- conversion here