PySpark - 使用 Kakfa Json 消息创建 pyspark 数据框
PySpark - Create a pyspark dataframe using Kakfa Json message
我正在使用 pyspark 结构化流式传输并从 Json 复杂格式的 Kafka 主题读取数据。
我将 Spark Structred Streaming Format 用作 Kafka,代码如下 -
spark = SparkSession.builder \
.appName("PythonSparkStreamingKafka") \
.getOrCreate()
kafkaStreamDF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "main.test.mysql.main.test_bank_data") \
.option("startingOffsets", "earliest") \
.load()
kafkaStreamDF1 = kafkaStreamDF.selectExpr("CAST(value AS STRING)")
message_schema = StructType().add("payload",StringType())
kafkaStreamDF2 = kafkaStreamDF1.select(from_json(col("value"),message_schema).alias("message"))
consoleOutput = kafkaStreamDF2.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.start()
我已经从消息中提取数据,直到 kafka json 消息的有效负载部分及其在控制台上的输出如下所示 -
|[{"before":null,"after":{"transaction_id":20,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL12071 ","value_date":18490,"withdrawal_amt":"AMTWoA==","deposit_amt":null,"balance_amt":"K6LiGA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
|[{"before":null,"after":{"transaction_id":21,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL13071 ","value_date":18490,"withdrawal_amt":"AV741A==","deposit_amt":null,"balance_amt":"KkPpRA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
现在我想提取后部分的数据并读取数据框中的归档数据,如下所示 -
transaction_id|account_no|transaction_date|transaction_details|value_date|withdrawal_amt|deposit_amt| balance_amt
20 | 409000611074 | 16/08/2020 | INDO GIBL Indiaforensic STL12071 | 16/08/2020 | 129000.00 | (null) | 7320950.00
21 | 409000611074 | 16/08/2020 | INDO GIBL Indiaforensic STL13071 | 16/08/2020 | 230013.00 | (null) | 7090937.00
请建议我如何在 pyspark 数据帧中实现预期的输出?
在kafka消息的准确值字段下方添加-
{“模式”:{“类型”:“结构”,“字段”:[{“类型”:“结构”,“字段”:[{“类型”:“int32”,“可选” :false,"field":"transaction_id"},{"type":"int64","optional":false,"field":"account_no"},{"type":" int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"transaction_date"},{"type":"string"," optional":true,"field":"transaction_details"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1 ,"field":"value_date"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters" :{"scale":"2","connect.decimal.precision":"12"},"field":"withdrawal_amt"},{"type":"bytes","optional":true "名称":"org.apache.kafka.connect.data.Decimal","版本":1,"参数":{"比例":"2","connect.decimal.precision":"12"},"字段":" deposit_amt"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale": "2","connect.decimal.precision":"12"},"字段":"balance_amt"}],"可选":true,"名称":"main.test.mysql.main.test_bank_data.Value"," field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"transaction_id"}, {"type":"int64","optional":false,"field":"account_no"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","版本":1,"字段":"transaction_date"},{"类型":"string","optional":true,"field":"transaction_details"},{"type":"int32","optional":true,"name":"io.debezium.time.Date ","version":1,"field":"value_date"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version ":1,"parameters":{"scale":"2","connect.decimal.precision":"12"},"field":"withdrawal_amt"},{"type":"bytes ","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"12" },"field":"deposit_amt"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters ":{"scale":"2","connect.decimal.precision":"12"},"field":"balance_amt"}],"optional":true,"name":" =43=]","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version "},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"} ,{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":" =53=]","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"类型":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true," field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field" :"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":" thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field" :"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":" ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type" :"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order" }],"optional":true,"field":"transaction"}],"optional":false,"name":"main.test.mysql.main.test_bank_data.Envelope"},"payload":{"before":null," after":{""transaction_id":146,"account_no":409000611076,"transaction_date":18652,"transaction_details":"来自 Indiaforensic SERVICES 的 TRF"," value_date":18652,"withdrawal_amt":"AA==","deposit_amt":"B6Eg","balance_amt":"B6Eg"},"来源":{"version":"1.4.0-SNAPSHOT","connector":"[=9 0=]","name":"main.test.mysql","ts_ms":1611587463000,"snapshot":"false","db":"main","table" :"test_bank_data","server_id":19105,"gtid":null,"file":"binlog.000584","pos":46195052,"row":0,"thread" :1604558,"query":null},"op":"c","ts_ms":1611587463181,"transaction":null}}
从这里我已经在 DF1 上转换成字符串,并将 Payload 的一部分带入 DF2。
--最终工况评论--
在 Kafka 连接端的 Debezium MySQL 连接器中转换 SMT 后添加 我在 PySpark 中使用 Kafaka 结构化流式传输消息值,如下所示 -
Value =
{"transaction_id":21,"account_no":409000611074,"transaction_date":"2020-08-
229","transaction_details":"INDO GIBL Indiaforensic STL13071
","value_date":"2020-08-22","withdrawal_amt":"230013.00","deposit_amt":null,"balance_amt":"7090937.00"}
message_schema = StructType([
StructField('transaction_id', IntegerType(), True),
StructField('account_no', LongType(), True),
StructField('transaction_date', StringType(), True),
StructField('transaction_details', StringType(), True),
StructField('value_date', StringType(), True),
StructField('withdrawal_amt', StringType(), True),
StructField('deposit_amt', StringType(), True),
StructField('balance_amt', StringType(), True)
]
)
您可以将字符串 JSON 消息的架构传递给 from_json
函数。
你的留言是这样的:
#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|value |
#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|[{"before":null,"after":{"transaction_id":20,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL12071 ","value_date":18490,"withdrawal_amt":"AMTWoA==","deposit_amt":null,"balance_amt":"K6LiGA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
#|[{"before":null,"after":{"transaction_id":21,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL13071 ","value_date":18490,"withdrawal_amt":"AV741A==","deposit_amt":null,"balance_amt":"KkPpRA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
您可以修改代码以将 json 中的 after
字段解析为 MapType,然后 select 您想要的键作为列:
message_schema = StructType([
StructField('before', MapType(StringType(), StringType(), True), True),
StructField('after', MapType(StringType(), StringType(), True), True),
StructField('source', MapType(StringType(), StringType(), True), True),
StructField('op', StringType(), True),
StructField('ts_ms', StringType(), True),
StructField('transaction', StringType(), True)
]
)
after_fields = [
"account_no", "balance_amt", "deposit_amt", "transaction_date",
"transaction_details", "transaction_id", "value_date", "withdrawal_amt"
]
# parse json strings using from_json and select message.after.*
kafkaStreamDF.withColumn(
"message",
F.from_json(F.col("value"), message_schema)
).select(
*[F.col("message.after").getItem(f).alias(f) for f in after_fields]
).writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.start() \
.awaitTermination()
我正在使用 pyspark 结构化流式传输并从 Json 复杂格式的 Kafka 主题读取数据。
我将 Spark Structred Streaming Format 用作 Kafka,代码如下 -
spark = SparkSession.builder \
.appName("PythonSparkStreamingKafka") \
.getOrCreate()
kafkaStreamDF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "main.test.mysql.main.test_bank_data") \
.option("startingOffsets", "earliest") \
.load()
kafkaStreamDF1 = kafkaStreamDF.selectExpr("CAST(value AS STRING)")
message_schema = StructType().add("payload",StringType())
kafkaStreamDF2 = kafkaStreamDF1.select(from_json(col("value"),message_schema).alias("message"))
consoleOutput = kafkaStreamDF2.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.start()
我已经从消息中提取数据,直到 kafka json 消息的有效负载部分及其在控制台上的输出如下所示 -
|[{"before":null,"after":{"transaction_id":20,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL12071 ","value_date":18490,"withdrawal_amt":"AMTWoA==","deposit_amt":null,"balance_amt":"K6LiGA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
|[{"before":null,"after":{"transaction_id":21,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL13071 ","value_date":18490,"withdrawal_amt":"AV741A==","deposit_amt":null,"balance_amt":"KkPpRA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
现在我想提取后部分的数据并读取数据框中的归档数据,如下所示 -
transaction_id|account_no|transaction_date|transaction_details|value_date|withdrawal_amt|deposit_amt| balance_amt
20 | 409000611074 | 16/08/2020 | INDO GIBL Indiaforensic STL12071 | 16/08/2020 | 129000.00 | (null) | 7320950.00
21 | 409000611074 | 16/08/2020 | INDO GIBL Indiaforensic STL13071 | 16/08/2020 | 230013.00 | (null) | 7090937.00
请建议我如何在 pyspark 数据帧中实现预期的输出?
在kafka消息的准确值字段下方添加-
{“模式”:{“类型”:“结构”,“字段”:[{“类型”:“结构”,“字段”:[{“类型”:“int32”,“可选” :false,"field":"transaction_id"},{"type":"int64","optional":false,"field":"account_no"},{"type":" int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"transaction_date"},{"type":"string"," optional":true,"field":"transaction_details"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1 ,"field":"value_date"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters" :{"scale":"2","connect.decimal.precision":"12"},"field":"withdrawal_amt"},{"type":"bytes","optional":true "名称":"org.apache.kafka.connect.data.Decimal","版本":1,"参数":{"比例":"2","connect.decimal.precision":"12"},"字段":" deposit_amt"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale": "2","connect.decimal.precision":"12"},"字段":"balance_amt"}],"可选":true,"名称":"main.test.mysql.main.test_bank_data.Value"," field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"transaction_id"}, {"type":"int64","optional":false,"field":"account_no"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","版本":1,"字段":"transaction_date"},{"类型":"string","optional":true,"field":"transaction_details"},{"type":"int32","optional":true,"name":"io.debezium.time.Date ","version":1,"field":"value_date"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version ":1,"parameters":{"scale":"2","connect.decimal.precision":"12"},"field":"withdrawal_amt"},{"type":"bytes ","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"12" },"field":"deposit_amt"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters ":{"scale":"2","connect.decimal.precision":"12"},"field":"balance_amt"}],"optional":true,"name":" =43=]","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version "},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"} ,{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":" =53=]","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"类型":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true," field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field" :"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":" thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field" :"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":" ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type" :"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order" }],"optional":true,"field":"transaction"}],"optional":false,"name":"main.test.mysql.main.test_bank_data.Envelope"},"payload":{"before":null," after":{""transaction_id":146,"account_no":409000611076,"transaction_date":18652,"transaction_details":"来自 Indiaforensic SERVICES 的 TRF"," value_date":18652,"withdrawal_amt":"AA==","deposit_amt":"B6Eg","balance_amt":"B6Eg"},"来源":{"version":"1.4.0-SNAPSHOT","connector":"[=9 0=]","name":"main.test.mysql","ts_ms":1611587463000,"snapshot":"false","db":"main","table" :"test_bank_data","server_id":19105,"gtid":null,"file":"binlog.000584","pos":46195052,"row":0,"thread" :1604558,"query":null},"op":"c","ts_ms":1611587463181,"transaction":null}}
从这里我已经在 DF1 上转换成字符串,并将 Payload 的一部分带入 DF2。
--最终工况评论-- 在 Kafka 连接端的 Debezium MySQL 连接器中转换 SMT 后添加 我在 PySpark 中使用 Kafaka 结构化流式传输消息值,如下所示 -
Value =
{"transaction_id":21,"account_no":409000611074,"transaction_date":"2020-08-
229","transaction_details":"INDO GIBL Indiaforensic STL13071
","value_date":"2020-08-22","withdrawal_amt":"230013.00","deposit_amt":null,"balance_amt":"7090937.00"}
message_schema = StructType([
StructField('transaction_id', IntegerType(), True),
StructField('account_no', LongType(), True),
StructField('transaction_date', StringType(), True),
StructField('transaction_details', StringType(), True),
StructField('value_date', StringType(), True),
StructField('withdrawal_amt', StringType(), True),
StructField('deposit_amt', StringType(), True),
StructField('balance_amt', StringType(), True)
]
)
您可以将字符串 JSON 消息的架构传递给 from_json
函数。
你的留言是这样的:
#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|value |
#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|[{"before":null,"after":{"transaction_id":20,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL12071 ","value_date":18490,"withdrawal_amt":"AMTWoA==","deposit_amt":null,"balance_amt":"K6LiGA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
#|[{"before":null,"after":{"transaction_id":21,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL13071 ","value_date":18490,"withdrawal_amt":"AV741A==","deposit_amt":null,"balance_amt":"KkPpRA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
您可以修改代码以将 json 中的 after
字段解析为 MapType,然后 select 您想要的键作为列:
message_schema = StructType([
StructField('before', MapType(StringType(), StringType(), True), True),
StructField('after', MapType(StringType(), StringType(), True), True),
StructField('source', MapType(StringType(), StringType(), True), True),
StructField('op', StringType(), True),
StructField('ts_ms', StringType(), True),
StructField('transaction', StringType(), True)
]
)
after_fields = [
"account_no", "balance_amt", "deposit_amt", "transaction_date",
"transaction_details", "transaction_id", "value_date", "withdrawal_amt"
]
# parse json strings using from_json and select message.after.*
kafkaStreamDF.withColumn(
"message",
F.from_json(F.col("value"), message_schema)
).select(
*[F.col("message.after").getItem(f).alias(f) for f in after_fields]
).writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.start() \
.awaitTermination()