使用 PySpark Streaming 反序列化 Kafka json 消息

Deserialize Kafka json message with PySpark Streaming

我有一个 pyspark 应用程序正在使用来自 Kafka 主题的消息,这些消息由 org.apache.kafka.connect.json.JsonConverter 序列化。我正在使用 confluent Kafka JDBC 连接器来执行此操作

问题是,当我使用消息时,ID 列出现在某种编码文本中,例如 "ARM=",而它应该是数字类型。

这是我现在的代码

spark = SparkSession.builder.appName("my app").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('WARN')
ssc = StreamingContext(sc, 5)

kafka_params = {
    "bootstrap.servers": "kafkahost:9092",
    "group.id": "Deserialize"
}

kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params)
kafka_stream.foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x))

ssc.start()
ssc.awaitTermination()

我知道 createDirectStream 有一个我可以设置的 valueDecoder 参数,问题是我不知道如何使用它进行解码。我也事先知道架构,因此如果需要我可以创建一个。

作为参考,这是我在打印 rdd.foreach

时得到的 JSON
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "bytes",
        "optional": False,
        "name": "org.apache.kafka.connect.data.Decimal",
        "version": 1,
        "parameters": {
          "scale": "0"
        },
        "field": "ID"
      },
      {
        "type": "string",
        "optional": True,
        "field": "COLUMN1"
      }
    ],
    "optional": False
  },
  "payload": {
    "ID": "AOo=",
    "COLUMN1": "some string"
  }
}

在您的 Connect 配置中,您可以设置 value.converter.schema.enable=false,然后您将只能获得该 JSON 记录的 "payload" 数据。

从那里开始,我假设您将能够根据 PySpark 中阅读流 JSON 的任何其他示例来处理消息。

否则,由于您没有使用结构化流,因此没有可供您定义的架构。相反,您至少必须做这样的事情来解析记录

rdd.map(lambda x: json.loads(x))\
    .map(lambda x: x['payload'])\
    .foreach(lambda x: print(x))

所以正如cricket_007提到的,在你的融合Kafka配置中,你必须将设置设置为value.converter.schema.enable=false。这将摆脱 Schema 字段,只留下有效负载 json。现在出于某种原因,我遇到了一个问题,我的所有数字列都将以这种奇怪的格式 AOo= 进行编码。现在,当使用 Json 序列化您的数据时,confluent 将使用 base64 转换您的数字列,但真正的问题甚至在此之前。出于某种原因,我所有的数字列都被转换为字节。不确定为什么要这样做,但它与 confluent 处理 Oracle 数据库的方式有关。无论如何,解决这个问题的方法是在 createDirectStream 中设置一个值解码器,例如

kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params, valueDecoder=decoder)

并且在你的解码器方法中,你必须从 UTF-8 解码你的消息,解析 json 然后从 base64 解码你的数字列,然后从这样的字节解码你的数字列

def decoder(s):
    if s is None:
        return None

    loaded_json = json.loads(s.decode('utf-8'))
    loaded_json["ID"] = int.from_bytes(base64.b64decode(loaded_json['ID']), "big")
    return loaded_json