使用 PySpark Streaming 反序列化 Kafka json 消息
Deserialize Kafka json message with PySpark Streaming
我有一个 pyspark 应用程序正在使用来自 Kafka 主题的消息,这些消息由 org.apache.kafka.connect.json.JsonConverter
序列化。我正在使用 confluent Kafka JDBC 连接器来执行此操作
问题是,当我使用消息时,ID 列出现在某种编码文本中,例如 "ARM=",而它应该是数字类型。
这是我现在的代码
spark = SparkSession.builder.appName("my app").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('WARN')
ssc = StreamingContext(sc, 5)
kafka_params = {
"bootstrap.servers": "kafkahost:9092",
"group.id": "Deserialize"
}
kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params)
kafka_stream.foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x))
ssc.start()
ssc.awaitTermination()
我知道 createDirectStream 有一个我可以设置的 valueDecoder 参数,问题是我不知道如何使用它进行解码。我也事先知道架构,因此如果需要我可以创建一个。
作为参考,这是我在打印 rdd.foreach
时得到的 JSON
{
"schema": {
"type": "struct",
"fields": [
{
"type": "bytes",
"optional": False,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "0"
},
"field": "ID"
},
{
"type": "string",
"optional": True,
"field": "COLUMN1"
}
],
"optional": False
},
"payload": {
"ID": "AOo=",
"COLUMN1": "some string"
}
}
在您的 Connect 配置中,您可以设置 value.converter.schema.enable=false
,然后您将只能获得该 JSON 记录的 "payload" 数据。
从那里开始,我假设您将能够根据 PySpark 中阅读流 JSON 的任何其他示例来处理消息。
否则,由于您没有使用结构化流,因此没有可供您定义的架构。相反,您至少必须做这样的事情来解析记录
rdd.map(lambda x: json.loads(x))\
.map(lambda x: x['payload'])\
.foreach(lambda x: print(x))
所以正如cricket_007提到的,在你的融合Kafka配置中,你必须将设置设置为value.converter.schema.enable=false
。这将摆脱 Schema 字段,只留下有效负载 json。现在出于某种原因,我遇到了一个问题,我的所有数字列都将以这种奇怪的格式 AOo=
进行编码。现在,当使用 Json 序列化您的数据时,confluent 将使用 base64 转换您的数字列,但真正的问题甚至在此之前。出于某种原因,我所有的数字列都被转换为字节。不确定为什么要这样做,但它与 confluent 处理 Oracle 数据库的方式有关。无论如何,解决这个问题的方法是在 createDirectStream
中设置一个值解码器,例如
kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params, valueDecoder=decoder)
并且在你的解码器方法中,你必须从 UTF-8 解码你的消息,解析 json 然后从 base64 解码你的数字列,然后从这样的字节解码你的数字列
def decoder(s):
if s is None:
return None
loaded_json = json.loads(s.decode('utf-8'))
loaded_json["ID"] = int.from_bytes(base64.b64decode(loaded_json['ID']), "big")
return loaded_json
我有一个 pyspark 应用程序正在使用来自 Kafka 主题的消息,这些消息由 org.apache.kafka.connect.json.JsonConverter
序列化。我正在使用 confluent Kafka JDBC 连接器来执行此操作
问题是,当我使用消息时,ID 列出现在某种编码文本中,例如 "ARM=",而它应该是数字类型。
这是我现在的代码
spark = SparkSession.builder.appName("my app").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('WARN')
ssc = StreamingContext(sc, 5)
kafka_params = {
"bootstrap.servers": "kafkahost:9092",
"group.id": "Deserialize"
}
kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params)
kafka_stream.foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x))
ssc.start()
ssc.awaitTermination()
我知道 createDirectStream 有一个我可以设置的 valueDecoder 参数,问题是我不知道如何使用它进行解码。我也事先知道架构,因此如果需要我可以创建一个。
作为参考,这是我在打印 rdd.foreach
时得到的 JSON{
"schema": {
"type": "struct",
"fields": [
{
"type": "bytes",
"optional": False,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "0"
},
"field": "ID"
},
{
"type": "string",
"optional": True,
"field": "COLUMN1"
}
],
"optional": False
},
"payload": {
"ID": "AOo=",
"COLUMN1": "some string"
}
}
在您的 Connect 配置中,您可以设置 value.converter.schema.enable=false
,然后您将只能获得该 JSON 记录的 "payload" 数据。
从那里开始,我假设您将能够根据 PySpark 中阅读流 JSON 的任何其他示例来处理消息。
否则,由于您没有使用结构化流,因此没有可供您定义的架构。相反,您至少必须做这样的事情来解析记录
rdd.map(lambda x: json.loads(x))\
.map(lambda x: x['payload'])\
.foreach(lambda x: print(x))
所以正如cricket_007提到的,在你的融合Kafka配置中,你必须将设置设置为value.converter.schema.enable=false
。这将摆脱 Schema 字段,只留下有效负载 json。现在出于某种原因,我遇到了一个问题,我的所有数字列都将以这种奇怪的格式 AOo=
进行编码。现在,当使用 Json 序列化您的数据时,confluent 将使用 base64 转换您的数字列,但真正的问题甚至在此之前。出于某种原因,我所有的数字列都被转换为字节。不确定为什么要这样做,但它与 confluent 处理 Oracle 数据库的方式有关。无论如何,解决这个问题的方法是在 createDirectStream
中设置一个值解码器,例如
kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params, valueDecoder=decoder)
并且在你的解码器方法中,你必须从 UTF-8 解码你的消息,解析 json 然后从 base64 解码你的数字列,然后从这样的字节解码你的数字列
def decoder(s):
if s is None:
return None
loaded_json = json.loads(s.decode('utf-8'))
loaded_json["ID"] = int.from_bytes(base64.b64decode(loaded_json['ID']), "big")
return loaded_json