使用 Spark Structured Streaming (pyspark) 从 Kafka Connect JSONConverter 消息中提取 "payload"(模式和负载)
Extracting "payload" from Kafka Connect JSONConverter messages with (schema & payload) using Spark Structured Streaming (pyspark)
然而,我想要完成的正是这个问题关于 () 的内容;就我而言,我使用的是 Python/Pyspark 而不是 Scala。
我正在尝试提取也包含模式的 Kafka 连接消息的“有效负载”部分。
示例消息:
{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}
第 1 步 - 为“有效负载”部分定义架构:
payload_schema = StructType([
StructField("emp_id", StringType(), False),
StructField("emp_name", StringType(), True),
StructField("city", StringType(), True),
StructField("emp_sal", StringType(), True),
StructField("manager_name", StringType(), True)])
第 2 步 - 从 Kafka 读取:
df =spark.readStream.format("kafka")
第 3 步 - 从 Kafka 消息中获取消息值:
kafka_df = df.selectExpr("CAST(value AS STRING)")
第 4 步 - 仅提取“有效载荷”(我卡在这里):
import pyspark.sql.functions as psf
emp_df = kafka_df\
.select(psf.from_json(psf.col('value'), payload_schema).alias("DF"))\
.select("DF.*")
我被困在这部分,因为我不知道如何在将 JSON 字符串传递给 from_json() 函数之前从中提取有效负载。
注意 :我知道我需要为整个消息定义完整的模式,然后才能在 from_json() 中使用它,然而;我试图只获取“有效负载”json 字符串部分。
您可以使用 SQL 函数 get_json_object
:
import pyspark.sql.functions as psf
kafka_df
.select(psf.get_json_object(kafka_df['value'],"$.payload").alias('payload'))
.select(psf.from_json(psf.col('payload'), payload_schema).alias("DF"))
.select("DF.*")
或者,您需要先定义整个消息的完整架构,然后才能在 from_json
中使用它。
这意味着您的架构应该如下所示:
full_schema = StructType([
StructField("schema", StructType([
StructField("type", StringType(), False),
StructField("name", StringType(), False),
StructField("fields", StructType([
StructField("field", StringType(), False),
StructField("type", StringType(), False)
]),
StructField("payload", StructType([
StructField("emp_id", StringType(), False),
StructField("emp_name", StringType(), True),
StructField("city", StringType(), True),
StructField("emp_sal", StringType(), True),
StructField("manager_name", StringType(), True)
])
])
请仔细检查此模式定义,因为我不完全确定如何在 Python 中的模式中定义数组,但我希望思路清晰。
完成后,您可以通过
select 有效负载字段
import pyspark.sql.functions as psf
emp_df = kafka_df\
.select(psf.from_json(psf.col('value'), full_schema).alias("DF"))\
.select("DF.payload.*")
出于某种原因,我错过了 pyspark 具有 get_json_object() 功能。
在迈克发表评论后,我回到 documentation 我找到了我要找的东西。
这是答案:
kafka_df = df.selectExpr("CAST(value AS STRING)")
payload_df = kafka_df.select(psf.get_json_object(kafka_df.value, "$.payload").alias("payload"))
emp_df = payload_df.select(psf.from_json(psf.col('payload'), schema).alias("DF")).select("DF.*")
然而,我想要完成的正是这个问题关于 (
我正在尝试提取也包含模式的 Kafka 连接消息的“有效负载”部分。
示例消息:
{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}
第 1 步 - 为“有效负载”部分定义架构:
payload_schema = StructType([
StructField("emp_id", StringType(), False),
StructField("emp_name", StringType(), True),
StructField("city", StringType(), True),
StructField("emp_sal", StringType(), True),
StructField("manager_name", StringType(), True)])
第 2 步 - 从 Kafka 读取:
df =spark.readStream.format("kafka")
第 3 步 - 从 Kafka 消息中获取消息值:
kafka_df = df.selectExpr("CAST(value AS STRING)")
第 4 步 - 仅提取“有效载荷”(我卡在这里):
import pyspark.sql.functions as psf
emp_df = kafka_df\
.select(psf.from_json(psf.col('value'), payload_schema).alias("DF"))\
.select("DF.*")
我被困在这部分,因为我不知道如何在将 JSON 字符串传递给 from_json() 函数之前从中提取有效负载。
注意 :我知道我需要为整个消息定义完整的模式,然后才能在 from_json() 中使用它,然而;我试图只获取“有效负载”json 字符串部分。
您可以使用 SQL 函数 get_json_object
:
import pyspark.sql.functions as psf
kafka_df
.select(psf.get_json_object(kafka_df['value'],"$.payload").alias('payload'))
.select(psf.from_json(psf.col('payload'), payload_schema).alias("DF"))
.select("DF.*")
或者,您需要先定义整个消息的完整架构,然后才能在 from_json
中使用它。
这意味着您的架构应该如下所示:
full_schema = StructType([
StructField("schema", StructType([
StructField("type", StringType(), False),
StructField("name", StringType(), False),
StructField("fields", StructType([
StructField("field", StringType(), False),
StructField("type", StringType(), False)
]),
StructField("payload", StructType([
StructField("emp_id", StringType(), False),
StructField("emp_name", StringType(), True),
StructField("city", StringType(), True),
StructField("emp_sal", StringType(), True),
StructField("manager_name", StringType(), True)
])
])
请仔细检查此模式定义,因为我不完全确定如何在 Python 中的模式中定义数组,但我希望思路清晰。
完成后,您可以通过
select 有效负载字段import pyspark.sql.functions as psf
emp_df = kafka_df\
.select(psf.from_json(psf.col('value'), full_schema).alias("DF"))\
.select("DF.payload.*")
出于某种原因,我错过了 pyspark 具有 get_json_object() 功能。 在迈克发表评论后,我回到 documentation 我找到了我要找的东西。
这是答案:
kafka_df = df.selectExpr("CAST(value AS STRING)")
payload_df = kafka_df.select(psf.get_json_object(kafka_df.value, "$.payload").alias("payload"))
emp_df = payload_df.select(psf.from_json(psf.col('payload'), schema).alias("DF")).select("DF.*")