如何将 PySpark Dataframe 列的类型指定为 JSON
How to specify the type of PySpark Dataframe column as JSON
以下是我们的 pyspark 应用程序代码片段。
schema = StructType(
[
StructField('name', StringType(), True),
StructField('version', StringType(), True),
StructField('requestBody', StringType(), True),
StructField('id', StringType(), True),
]
)
df_new = df.withColumn('value', from_json('value', schema)) \
.where(col('value.version') == '1') \
.select(col('value.*'))\
.na.drop() \
.withColumn('requestBody', decrypt_udf(col('requestBody')))
df_new.show()
+-------+--------+---------------------------------------------+---+
| name| version| requestBody| id|
+-------+--------+---------------------------------------------+---+
|kj-test| 1|{"data": {"score": 130, "group": "silver"}} | 1|
|kj-test| 1|{"data": {"score": 250, "group": "gold"}} | 2|
|kj-test| 1|{"data": {"score": 330, "group": "platinum"}}| 3|
+-------+--------+---------------------------------------------+---+
decrypt_udf UDF 函数片段:
@udf(returnType=StringType())
def decrypt_udf(encrypted_string: str):
...
...
return decrypted_json_str
当我将 spark dataframe 写入 S3 存储桶时,如下所示
df_new.write.mode('overwrite').json(path=s3outputpath)
生成的文件内容如下,这里 requestBody
的值写成 String
因此在双引号中并且也转义了内部双引号。
{"name":"kj-test","version":"1","requestBody":"{\"data\": {\"score\": 130, \"group\": \"silver\"}}","id":"1"}
{"name":"kj-test","version":"2","requestBody":"{\"data\": {\"score\": 250, \"group\": \"gold\"}}","id":"1"}
{"name":"kj-test","version":"3","requestBody":"{\"data\": {\"score\": 330, \"group\": \"platinum\"}}","id":"1"}
但是,我希望 requestBody
的值写成 json,如下所示。
{"name":"kj-test","version":"1","requestBody":{"data": {"score": 130, "group": "silver"}},"id":"1"}
我知道我已经在架构中将 requestBody 的类型指定为字符串 StructField('requestBody', StringType(), True)
,因此我以这种方式看到了输出。我怎样才能达到我期望的输出?没有 JsonType
这样的类型
编辑:
请注意,我的 requestBody
模式不会总是这样 {"data": {"score": 130, "group": "silver"}}
。对于给定的 运行,它是固定的,但另一个 运行 可能具有完全不同的架构。
本质上,需要一种从 json 字符串推断模式的方法。找到一些可能有用的 SO 帖子,将尝试这些:
Spark from_json with dynamic schema
试试下面的代码。 (我没有测试过)
使用 from_json
函数将 requestBody
json 字符串转换为结构。
schema = StructType(
[
StructField('name', StringType(), True),
StructField('version', StringType(), True),
StructField('requestBody', StringType(), True),
StructField('id', StringType(), True),
]
)
为 requestBody
准备架构
requestSchema=StructType(
[
StructField('data', StructType([StructField('group',StringType(),True),StructField('score',LongType(),True)])),
]
)
df_new = df.withColumn('value', from_json('value', schema)) \
.where(col('value.version') == '1') \
.select(col('value.*'))\
.withColumn()
.na.drop() \
.withColumn('requestBody', from_json('requestBody',requestSchema))
df_new.write.mode('overwrite').json(path=s3outputpath)
在您的 udf 中,添加以下将 python 对象转换为 JSON 字符串的方法:
import json
@udf(returnType=StringType())
def decrypt_udf(encrypted_string: str):
...
...
return json.dumps(decrypted_json_str)
更新的解决方案(我认为这是更好的解决方案)
我们终于使用了另一个聪明的解决方案。在这里,我们定义了一个 udf get_combined_json
,它结合了给定 Row
的所有列,然后 returns 一个 json 字符串。导致我们最终的数据框只有一列,这样我们就可以将数据框写成文本文件,这样整个 json 字符串就可以原样写入,没有任何转义。以下是代码片段:
df_new = df.withColumn('value', from_json('value', schema)) \
.where(col('value.version') == '1') \
.select(col('value.*'))\
.na.drop() \
.withColumn('requestBody', decrypt_udf(col('requestBody')))
df_new.withColumn('combinedColumns', get_combined_json(struct([df_new[x] for x in df_new.columns]))) \
.select(col('combinedColumns'))\
.write.mode('overwrite').text(path=output_s3_bucket_path)
...
@udf(returnType=StringType())
def get_combined_json(row: Row):
return json.dumps({"requestBody": json.loads(row.requestBody),
"name": row.name,
"version": row.version,
"id": row.id})
旧的解决方案
下面是我们如何 derived/inferred 来自 requestBody
json 字符串的模式:
request_body_schema = spark_session.read.json(df_new.rdd.map(lambda r: r.requestBody)).schema
然后使用模式更新数据框。这是有效的最终代码:
df_new = df.withColumn('value', from_json('value', schema)) \
.where(col('value.version') == '1') \
.select(col('value.*'))\
.na.drop() \
.withColumn('requestBody', decrypt_udf(col('requestBody')))
request_body_schema = spark_session.read.json(df_new.rdd.map(lambda r: r.requestBody)).schema
df_new = df_new.withColumn('requestBody', from_json(col('requestBody'), request_body_schema))
df_new.write.mode('overwrite').json(path=output_s3_bucket_path)
写入S3存储桶的输出格式如下:
{"name":"kj-test","version":"1","requestBody":{"data": {"score": 130, "group": "silver"}},"id":"1"}
以下是我们的 pyspark 应用程序代码片段。
schema = StructType(
[
StructField('name', StringType(), True),
StructField('version', StringType(), True),
StructField('requestBody', StringType(), True),
StructField('id', StringType(), True),
]
)
df_new = df.withColumn('value', from_json('value', schema)) \
.where(col('value.version') == '1') \
.select(col('value.*'))\
.na.drop() \
.withColumn('requestBody', decrypt_udf(col('requestBody')))
df_new.show()
+-------+--------+---------------------------------------------+---+
| name| version| requestBody| id|
+-------+--------+---------------------------------------------+---+
|kj-test| 1|{"data": {"score": 130, "group": "silver"}} | 1|
|kj-test| 1|{"data": {"score": 250, "group": "gold"}} | 2|
|kj-test| 1|{"data": {"score": 330, "group": "platinum"}}| 3|
+-------+--------+---------------------------------------------+---+
decrypt_udf UDF 函数片段:
@udf(returnType=StringType())
def decrypt_udf(encrypted_string: str):
...
...
return decrypted_json_str
当我将 spark dataframe 写入 S3 存储桶时,如下所示
df_new.write.mode('overwrite').json(path=s3outputpath)
生成的文件内容如下,这里 requestBody
的值写成 String
因此在双引号中并且也转义了内部双引号。
{"name":"kj-test","version":"1","requestBody":"{\"data\": {\"score\": 130, \"group\": \"silver\"}}","id":"1"}
{"name":"kj-test","version":"2","requestBody":"{\"data\": {\"score\": 250, \"group\": \"gold\"}}","id":"1"}
{"name":"kj-test","version":"3","requestBody":"{\"data\": {\"score\": 330, \"group\": \"platinum\"}}","id":"1"}
但是,我希望 requestBody
的值写成 json,如下所示。
{"name":"kj-test","version":"1","requestBody":{"data": {"score": 130, "group": "silver"}},"id":"1"}
我知道我已经在架构中将 requestBody 的类型指定为字符串 StructField('requestBody', StringType(), True)
,因此我以这种方式看到了输出。我怎样才能达到我期望的输出?没有 JsonType
编辑:
请注意,我的 requestBody
模式不会总是这样 {"data": {"score": 130, "group": "silver"}}
。对于给定的 运行,它是固定的,但另一个 运行 可能具有完全不同的架构。
本质上,需要一种从 json 字符串推断模式的方法。找到一些可能有用的 SO 帖子,将尝试这些:
Spark from_json with dynamic schema
试试下面的代码。 (我没有测试过)
使用 from_json
函数将 requestBody
json 字符串转换为结构。
schema = StructType(
[
StructField('name', StringType(), True),
StructField('version', StringType(), True),
StructField('requestBody', StringType(), True),
StructField('id', StringType(), True),
]
)
为 requestBody
requestSchema=StructType(
[
StructField('data', StructType([StructField('group',StringType(),True),StructField('score',LongType(),True)])),
]
)
df_new = df.withColumn('value', from_json('value', schema)) \
.where(col('value.version') == '1') \
.select(col('value.*'))\
.withColumn()
.na.drop() \
.withColumn('requestBody', from_json('requestBody',requestSchema))
df_new.write.mode('overwrite').json(path=s3outputpath)
在您的 udf 中,添加以下将 python 对象转换为 JSON 字符串的方法:
import json
@udf(returnType=StringType())
def decrypt_udf(encrypted_string: str):
...
...
return json.dumps(decrypted_json_str)
更新的解决方案(我认为这是更好的解决方案)
我们终于使用了另一个聪明的解决方案。在这里,我们定义了一个 udf get_combined_json
,它结合了给定 Row
的所有列,然后 returns 一个 json 字符串。导致我们最终的数据框只有一列,这样我们就可以将数据框写成文本文件,这样整个 json 字符串就可以原样写入,没有任何转义。以下是代码片段:
df_new = df.withColumn('value', from_json('value', schema)) \
.where(col('value.version') == '1') \
.select(col('value.*'))\
.na.drop() \
.withColumn('requestBody', decrypt_udf(col('requestBody')))
df_new.withColumn('combinedColumns', get_combined_json(struct([df_new[x] for x in df_new.columns]))) \
.select(col('combinedColumns'))\
.write.mode('overwrite').text(path=output_s3_bucket_path)
...
@udf(returnType=StringType())
def get_combined_json(row: Row):
return json.dumps({"requestBody": json.loads(row.requestBody),
"name": row.name,
"version": row.version,
"id": row.id})
旧的解决方案
下面是我们如何 derived/inferred 来自 requestBody
json 字符串的模式:
request_body_schema = spark_session.read.json(df_new.rdd.map(lambda r: r.requestBody)).schema
然后使用模式更新数据框。这是有效的最终代码:
df_new = df.withColumn('value', from_json('value', schema)) \
.where(col('value.version') == '1') \
.select(col('value.*'))\
.na.drop() \
.withColumn('requestBody', decrypt_udf(col('requestBody')))
request_body_schema = spark_session.read.json(df_new.rdd.map(lambda r: r.requestBody)).schema
df_new = df_new.withColumn('requestBody', from_json(col('requestBody'), request_body_schema))
df_new.write.mode('overwrite').json(path=output_s3_bucket_path)
写入S3存储桶的输出格式如下:
{"name":"kj-test","version":"1","requestBody":{"data": {"score": 130, "group": "silver"}},"id":"1"}