如何将字节从 Kafka 转换为其原始对象?

How to convert bytes from Kafka to their original object?

我正在从 Kafka 获取数据,然后使用默认解码器反序列化 Array[Byte],之后我的 RDD 元素看起来像 (null,[B@406fa9b2)(null,[B@21a9fe0) 但我想要我的原始数据有一个模式,那么我该如何实现呢?

我以 Avro 格式序列化消息。

您必须使用适当的反序列化器对字节进行解码,比如字符串或您的自定义对象。

如果你不进行解码,你会得到 [B@406fa9b2 那只是 Java.

中字节数组的文本表示

Kafka 对您的消息内容一无所知,因此它将字节数组从生产者传递给消费者。

在 Spark Streaming 中,您必须对键和值使用序列化程序(引用 KafkaWordCount example):

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")

使用上面的序列化程序你会得到 DStream[String] 所以你可以使用 RDD[String].

但是,如果您想将字节数组反序列化为您的自定义 class 直接,您必须编写一个自定义 Serializer(这是 Kafka 特定的,与 Spark 无关)。

我推荐的是将 JSON 与固定模式或 Avro(使用 Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages 中描述的解决方案)一起使用。


Structured Streaming 中,管道可能如下所示:

val fromKafka = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load.
  select('value cast "string") // <-- conversion here