如何从 JSON 字符串列中提取键和值作为单独的列

How to extract key and value as separate columns from a JSON string column

我有一个包含列 col1col2 的 DataFrame,其中 col2 可以包含 JSON 字符串或纯字符串。如果它包含一个可解析的 JSON 字符串,我需要提取键和值以将列分隔为列表,否则它应该 return 一个空列表作为第三行和第四行。

我正在使用 pyspark 来实现这一点。在此感谢任何帮助。

源数据帧:

+-----+----------------------------------------------+
| col1|       col2                                   |
+-----+----------------------------------------------+
|a    |{"key1":"val1","key2":"val2"}                 |
|b    |{"key5":"val5", "key6":"val6", "key7":"val7"} |
|c    |"just a string"                               |
|d    | null                                         |
+----------------------------------------------------+

所需的数据帧:

+-----+----------------+----------------+
| col1|   keys         |   values       |
+-----+----------------+---------------+
|a    |[key1,key2]     |[val1,val2]     |
|b    |[key5,key6,key7]|[val5,val6,val7]|
|c    |[]              |[]              |
|d    |[]              |[]              |
+-----+----------------+----------------+

你可以使用来自 sql 模块的 pyspark 函数爆炸:
来自文档:

pyspark.sql.functions.explode(col)[来源] Returns 为给定数组或映射中的每个元素创建一个新行。除非另有说明,否则对数组中的元素使用默认列名 col,对映射中的元素使用默认列名 key 和 value。

from pyspark.sql import Row
eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
eDF.select(explode(eDF.intlist).alias("anInt")).collect()
[Row(anInt=1), Row(anInt=2), Row(anInt=3)]

eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
+---+-----+
|key|value|
+---+-----+
|  a|    b|
+---+-----+

jsonpath 中的键是 $[*~],值是 $[*]。但这似乎不受 get_json_object.

的支持

所以我们需要用户定义函数:

def json_keys(s):
     import json
     try:
         data = json.loads(s)
         return list(data.keys())
     except:
         return None
spark.udf.register("json_keys", json_keys)

def json_values(s):
     import json
     try:
         data = json.loads(s)
         return list(data.values())
     except:
         return None
spark.udf.register("json_values", json_values)

df.selectExpr("col1", "json_keys(col2) keys", "json_values(col2) values").collect()

产生:

+----+------------+------------+
|col1|        keys|      values|
+----+------------+------------+
|   a|[key1, key2]|[val1, val2]|
|   b|[key5, key6]|[val7, val6]|
|   c|        null|        null|
|   d|        null|        null|
+----+------------+------------+

老问题,但我不太喜欢建议为此使用 UDF 的其他答案。

对于 Spark 2.2+,您应该使用 from_json function to convert json strings into map type then use map_keys function to gets the keys and map_values 函数来获取值:

from pyspark.sql.functions import from_json, map_keys, map_values

df1 = df.withColumn('col2', from_json('col2', 'map<string,string>')) \
        .withColumn('keys', map_keys('col2')) \
        .withColumn('values', map_values('col2')) \
        .select('col1', 'keys', 'values') 

#+----+------------------+------------------+
#|col1|keys              |values            |
#+----+------------------+------------------+
#|a   |[key1, key2]      |[val1, val2]      |
#|b   |[key5, key6, key7]|[val5, val6, val7]|
#|c   |null              |null              |
#|d   |null              |null              |
#+----+------------------+------------------+