如何从 JSON 字符串列中提取键和值作为单独的列
How to extract key and value as separate columns from a JSON string column
我有一个包含列 col1
和 col2
的 DataFrame,其中 col2
可以包含 JSON 字符串或纯字符串。如果它包含一个可解析的 JSON 字符串,我需要提取键和值以将列分隔为列表,否则它应该 return 一个空列表作为第三行和第四行。
我正在使用 pyspark 来实现这一点。在此感谢任何帮助。
源数据帧:
+-----+----------------------------------------------+
| col1| col2 |
+-----+----------------------------------------------+
|a |{"key1":"val1","key2":"val2"} |
|b |{"key5":"val5", "key6":"val6", "key7":"val7"} |
|c |"just a string" |
|d | null |
+----------------------------------------------------+
所需的数据帧:
+-----+----------------+----------------+
| col1| keys | values |
+-----+----------------+---------------+
|a |[key1,key2] |[val1,val2] |
|b |[key5,key6,key7]|[val5,val6,val7]|
|c |[] |[] |
|d |[] |[] |
+-----+----------------+----------------+
你可以使用来自 sql 模块的 pyspark 函数爆炸:
来自文档:
pyspark.sql.functions.explode(col)[来源]
Returns 为给定数组或映射中的每个元素创建一个新行。除非另有说明,否则对数组中的元素使用默认列名 col,对映射中的元素使用默认列名 key 和 value。
from pyspark.sql import Row
eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
eDF.select(explode(eDF.intlist).alias("anInt")).collect()
[Row(anInt=1), Row(anInt=2), Row(anInt=3)]
eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
+---+-----+
|key|value|
+---+-----+
| a| b|
+---+-----+
jsonpath 中的键是 $[*~]
,值是 $[*]
。但这似乎不受 get_json_object
.
的支持
所以我们需要用户定义函数:
def json_keys(s):
import json
try:
data = json.loads(s)
return list(data.keys())
except:
return None
spark.udf.register("json_keys", json_keys)
def json_values(s):
import json
try:
data = json.loads(s)
return list(data.values())
except:
return None
spark.udf.register("json_values", json_values)
df.selectExpr("col1", "json_keys(col2) keys", "json_values(col2) values").collect()
产生:
+----+------------+------------+
|col1| keys| values|
+----+------------+------------+
| a|[key1, key2]|[val1, val2]|
| b|[key5, key6]|[val7, val6]|
| c| null| null|
| d| null| null|
+----+------------+------------+
老问题,但我不太喜欢建议为此使用 UDF 的其他答案。
对于 Spark 2.2+,您应该使用 from_json
function to convert json strings into map type then use map_keys
function to gets the keys and map_values
函数来获取值:
from pyspark.sql.functions import from_json, map_keys, map_values
df1 = df.withColumn('col2', from_json('col2', 'map<string,string>')) \
.withColumn('keys', map_keys('col2')) \
.withColumn('values', map_values('col2')) \
.select('col1', 'keys', 'values')
#+----+------------------+------------------+
#|col1|keys |values |
#+----+------------------+------------------+
#|a |[key1, key2] |[val1, val2] |
#|b |[key5, key6, key7]|[val5, val6, val7]|
#|c |null |null |
#|d |null |null |
#+----+------------------+------------------+
我有一个包含列 col1
和 col2
的 DataFrame,其中 col2
可以包含 JSON 字符串或纯字符串。如果它包含一个可解析的 JSON 字符串,我需要提取键和值以将列分隔为列表,否则它应该 return 一个空列表作为第三行和第四行。
我正在使用 pyspark 来实现这一点。在此感谢任何帮助。
源数据帧:
+-----+----------------------------------------------+
| col1| col2 |
+-----+----------------------------------------------+
|a |{"key1":"val1","key2":"val2"} |
|b |{"key5":"val5", "key6":"val6", "key7":"val7"} |
|c |"just a string" |
|d | null |
+----------------------------------------------------+
所需的数据帧:
+-----+----------------+----------------+
| col1| keys | values |
+-----+----------------+---------------+
|a |[key1,key2] |[val1,val2] |
|b |[key5,key6,key7]|[val5,val6,val7]|
|c |[] |[] |
|d |[] |[] |
+-----+----------------+----------------+
你可以使用来自 sql 模块的 pyspark 函数爆炸:
来自文档:
pyspark.sql.functions.explode(col)[来源] Returns 为给定数组或映射中的每个元素创建一个新行。除非另有说明,否则对数组中的元素使用默认列名 col,对映射中的元素使用默认列名 key 和 value。
from pyspark.sql import Row
eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
eDF.select(explode(eDF.intlist).alias("anInt")).collect()
[Row(anInt=1), Row(anInt=2), Row(anInt=3)]
eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
+---+-----+
|key|value|
+---+-----+
| a| b|
+---+-----+
jsonpath 中的键是 $[*~]
,值是 $[*]
。但这似乎不受 get_json_object
.
所以我们需要用户定义函数:
def json_keys(s):
import json
try:
data = json.loads(s)
return list(data.keys())
except:
return None
spark.udf.register("json_keys", json_keys)
def json_values(s):
import json
try:
data = json.loads(s)
return list(data.values())
except:
return None
spark.udf.register("json_values", json_values)
df.selectExpr("col1", "json_keys(col2) keys", "json_values(col2) values").collect()
产生:
+----+------------+------------+
|col1| keys| values|
+----+------------+------------+
| a|[key1, key2]|[val1, val2]|
| b|[key5, key6]|[val7, val6]|
| c| null| null|
| d| null| null|
+----+------------+------------+
老问题,但我不太喜欢建议为此使用 UDF 的其他答案。
对于 Spark 2.2+,您应该使用 from_json
function to convert json strings into map type then use map_keys
function to gets the keys and map_values
函数来获取值:
from pyspark.sql.functions import from_json, map_keys, map_values
df1 = df.withColumn('col2', from_json('col2', 'map<string,string>')) \
.withColumn('keys', map_keys('col2')) \
.withColumn('values', map_values('col2')) \
.select('col1', 'keys', 'values')
#+----+------------------+------------------+
#|col1|keys |values |
#+----+------------------+------------------+
#|a |[key1, key2] |[val1, val2] |
#|b |[key5, key6, key7]|[val5, val6, val7]|
#|c |null |null |
#|d |null |null |
#+----+------------------+------------------+