将列转换为 JSON/dict 并展平 pyspark 中列中的 JSON 值
Casting a column to JSON/dict and flattening JSON values in a column in pyspark
我是 Pyspark 的新手,我正在弄清楚如何将列类型转换为 dict 类型,然后使用 explode
.
将该列展平为多列
这是我的数据框的样子:
col1 | col2 |
-----------------------
test:1 | {"test1":[{"Id":"17","cName":"c1"},{"Id":"01","cName":"c2","pScore":0.003609}],
{"test8":[{"Id":"1","cName":"c11","pScore":0.0},{"Id":"012","cName":"c2","pScore":0.003609}]
test:2 | {"test1:subtest2":[{"Id":"18","cName":"c13","pScore":0.00203}]}
现在,这个数据框的模式是
root
|-- col1: string (nullable = true)
|-- col2: string (nullable = true)
我想要的输出是这样的:
col1 | col2 | Id | cName | pScore |
------------------------------------------------
test:1 | test1 | 17 | c1 | null |
test:1 | test1 | 01 | c2 | 0.003609|
test:1 | test8 | 1 | c11 | 0.0 |
test:1 | test8 | 012| c2 | 0.003609|
test:2 | test1:subtest2 | 18 | c13 | 0.00203 |
我在为 col2 定义正确的架构以将其类型从 String
转换为 json
或 dict
时遇到问题。然后,我希望能够将值分解为多个列,如上所示。任何帮助将不胜感激。我正在使用 Spark 2.0 + .
谢谢!
由于 JSON 中每一行的键名不同,为 json 定义一个通用模式不会很好地工作,我相信最好通过 UDF 来处理这个问题:
import pyspark.sql.functions as f
import pyspark.sql.types as t
from pyspark.sql import Row
import json
def extract_key(dumped_json):
"""
Extracts the single key from the dumped json (as a string).
"""
if dumped_json is None:
return None
d = json.loads(dumped_json)
try:
return list(d.keys())[0]
except IndexError:
return None
def extract_values(dumped_json):
"""
Extracts the single array value from the dumped json and parses each element
of the array as a spark Row.
"""
if dumped_json is None:
return None
d = json.loads(dumped_json)
try:
return [Row(**_d) for _d in list(d.values())[0]]
except IndexError:
return None
# Definition of the output type of the `extract_values` function
output_values_type = t.ArrayType(t.StructType(
[t.StructField("Id", t.StringType()),
t.StructField("cName", t.StringType()),
t.StructField("pScore", t.DoubleType())]
))
# Define UDFs
extract_key_udf = f.udf(extract_key, t.StringType())
extract_values_udf = f.udf(extract_values, output_values_type)
# Extract values and keys
extracted_df = df.withColumn("values", extract_values_udf("col2")). \
withColumn("col2", extract_key_udf("col2"))
# Explode the array
exploded_df = extracted_df.withColumn("values", f.explode("values"))
# Select the wanted columns
final_df = exploded_df.select("col1", "col2", "values.Id", "values.cName",
"values.pScore")
结果如愿:
+------+--------------+---+-----+--------+
|col1 |col2 |Id |cName|pScore |
+------+--------------+---+-----+--------+
|test:1|test1:subtest1|17 |c1 |0.002034|
|test:1|test1:subtest1|01 |c2 |0.003609|
|test:2|test1:subtest2|18 |c13 |0.00203 |
+------+--------------+---+-----+--------+
更新我的答案,我使用 udf
将密钥放入数组,然后分解以达到所需的输出
参见下面的示例:
import json
import re
import pyspark.sql.functions as f
from pyspark.shell import spark
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, DoubleType
df = spark.createDataFrame([
('test:1',
'{"test1":[{"Id":"17","cName":"c1"},{"Id":"01","cName":"c2","pScore":0.003609}]},'
'{"test8":[{"Id":"1","cName":"c11","pScore":0.0},{"Id":"012","cName":"c2","pScore":0.003609}]}'),
('test:2', '{"test1:subtest2":[{"Id":"18","cName":"c13","pScore":0.00203}]}')
], ['col1', 'col2'])
schema = ArrayType(
StructType(
[
StructField("Col", StringType()),
StructField("Id", StringType()),
StructField("cName", StringType()),
StructField("pScore", DoubleType())
]
)
)
@f.udf(returnType=schema)
def parse_col(column):
updated_values = []
for it in re.finditer(r'{.*?}]}', column):
parse = json.loads(it.group())
for key, values in parse.items():
for value in values:
value['Col'] = key
updated_values.append(value)
return updated_values
df = df \
.withColumn('tmp', parse_col(f.col('col2'))) \
.withColumn('tmp', f.explode(f.col('tmp'))) \
.select(f.col('col1'),
f.col('tmp').Col.alias('col2'),
f.col('tmp').Id.alias('Id'),
f.col('tmp').cName.alias('cName'),
f.col('tmp').pScore.alias('pScore'))
df.show()
输出:
+------+--------------+---+-----+--------+
| col1| col2| Id|cName| pScore|
+------+--------------+---+-----+--------+
|test:1| test1| 17| c1| null|
|test:1| test1| 01| c2|0.003609|
|test:1| test8| 1| c11| 0.0|
|test:1| test8|012| c2|0.003609|
|test:2|test1:subtest2| 18| c13| 0.00203|
+------+--------------+---+-----+--------+
我是 Pyspark 的新手,我正在弄清楚如何将列类型转换为 dict 类型,然后使用 explode
.
这是我的数据框的样子:
col1 | col2 |
-----------------------
test:1 | {"test1":[{"Id":"17","cName":"c1"},{"Id":"01","cName":"c2","pScore":0.003609}],
{"test8":[{"Id":"1","cName":"c11","pScore":0.0},{"Id":"012","cName":"c2","pScore":0.003609}]
test:2 | {"test1:subtest2":[{"Id":"18","cName":"c13","pScore":0.00203}]}
现在,这个数据框的模式是
root
|-- col1: string (nullable = true)
|-- col2: string (nullable = true)
我想要的输出是这样的:
col1 | col2 | Id | cName | pScore |
------------------------------------------------
test:1 | test1 | 17 | c1 | null |
test:1 | test1 | 01 | c2 | 0.003609|
test:1 | test8 | 1 | c11 | 0.0 |
test:1 | test8 | 012| c2 | 0.003609|
test:2 | test1:subtest2 | 18 | c13 | 0.00203 |
我在为 col2 定义正确的架构以将其类型从 String
转换为 json
或 dict
时遇到问题。然后,我希望能够将值分解为多个列,如上所示。任何帮助将不胜感激。我正在使用 Spark 2.0 + .
谢谢!
由于 JSON 中每一行的键名不同,为 json 定义一个通用模式不会很好地工作,我相信最好通过 UDF 来处理这个问题:
import pyspark.sql.functions as f
import pyspark.sql.types as t
from pyspark.sql import Row
import json
def extract_key(dumped_json):
"""
Extracts the single key from the dumped json (as a string).
"""
if dumped_json is None:
return None
d = json.loads(dumped_json)
try:
return list(d.keys())[0]
except IndexError:
return None
def extract_values(dumped_json):
"""
Extracts the single array value from the dumped json and parses each element
of the array as a spark Row.
"""
if dumped_json is None:
return None
d = json.loads(dumped_json)
try:
return [Row(**_d) for _d in list(d.values())[0]]
except IndexError:
return None
# Definition of the output type of the `extract_values` function
output_values_type = t.ArrayType(t.StructType(
[t.StructField("Id", t.StringType()),
t.StructField("cName", t.StringType()),
t.StructField("pScore", t.DoubleType())]
))
# Define UDFs
extract_key_udf = f.udf(extract_key, t.StringType())
extract_values_udf = f.udf(extract_values, output_values_type)
# Extract values and keys
extracted_df = df.withColumn("values", extract_values_udf("col2")). \
withColumn("col2", extract_key_udf("col2"))
# Explode the array
exploded_df = extracted_df.withColumn("values", f.explode("values"))
# Select the wanted columns
final_df = exploded_df.select("col1", "col2", "values.Id", "values.cName",
"values.pScore")
结果如愿:
+------+--------------+---+-----+--------+
|col1 |col2 |Id |cName|pScore |
+------+--------------+---+-----+--------+
|test:1|test1:subtest1|17 |c1 |0.002034|
|test:1|test1:subtest1|01 |c2 |0.003609|
|test:2|test1:subtest2|18 |c13 |0.00203 |
+------+--------------+---+-----+--------+
更新我的答案,我使用 udf
将密钥放入数组,然后分解以达到所需的输出
参见下面的示例:
import json
import re
import pyspark.sql.functions as f
from pyspark.shell import spark
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, DoubleType
df = spark.createDataFrame([
('test:1',
'{"test1":[{"Id":"17","cName":"c1"},{"Id":"01","cName":"c2","pScore":0.003609}]},'
'{"test8":[{"Id":"1","cName":"c11","pScore":0.0},{"Id":"012","cName":"c2","pScore":0.003609}]}'),
('test:2', '{"test1:subtest2":[{"Id":"18","cName":"c13","pScore":0.00203}]}')
], ['col1', 'col2'])
schema = ArrayType(
StructType(
[
StructField("Col", StringType()),
StructField("Id", StringType()),
StructField("cName", StringType()),
StructField("pScore", DoubleType())
]
)
)
@f.udf(returnType=schema)
def parse_col(column):
updated_values = []
for it in re.finditer(r'{.*?}]}', column):
parse = json.loads(it.group())
for key, values in parse.items():
for value in values:
value['Col'] = key
updated_values.append(value)
return updated_values
df = df \
.withColumn('tmp', parse_col(f.col('col2'))) \
.withColumn('tmp', f.explode(f.col('tmp'))) \
.select(f.col('col1'),
f.col('tmp').Col.alias('col2'),
f.col('tmp').Id.alias('Id'),
f.col('tmp').cName.alias('cName'),
f.col('tmp').pScore.alias('pScore'))
df.show()
输出:
+------+--------------+---+-----+--------+
| col1| col2| Id|cName| pScore|
+------+--------------+---+-----+--------+
|test:1| test1| 17| c1| null|
|test:1| test1| 01| c2|0.003609|
|test:1| test8| 1| c11| 0.0|
|test:1| test8|012| c2|0.003609|
|test:2|test1:subtest2| 18| c13| 0.00203|
+------+--------------+---+-----+--------+