使用 Python Spark 从 Hadoop Table 的 Json 对象中提取所有键
Extract all keys from Json object from Hadoop Table using Python Spark
我有 Hadoop table 调用 table_with_json_string
例如:
+-----------------------------------+---------------------------------+
| creation_date | json_string_colum |
+-----------------------------------+---------------------------------+
| 2020-01-29 | "{keys : {1 : 'a', 2 : 'b' }}" |
+-----------------------------------+---------------------------------+
期望输出:
+-----------------------------------+----------------------------------+----------+
| creation_date | json_string_colum | keys |
+-----------------------------------+----------------------------------+----------+
| 2020-01-29 | "{keys : {1 : 'a', 2 : 'b' }}" | 1 |
| 2020-01-29 | "{keys : {1 : 'a', 2 : 'b' }}" | 2 |
+-----------------------------------+----------------------------------+----------+
我试过:
from pyspark.sql import functions as sf
from pyspark.sql import types as st
from pyspark.sql.functions import from_json, col,explode
from pyspark.sql.types import StructType, StructField, StringType,MapType
schema = StructType([StructField("keys",
MapType(StringType(),StringType()),True)])
df = spark.table('table_with_json_string').select(col("creation_date"),col("json_string_colum"))
df = df.withColumn("map_json_column", from_json("json_string_colum",schema))
df.show(1,False)
+--------------------+-------------------------------------+----------------------------------+
| creation_date| json_string_colum | map_json_column |
+--------------------+-------------------------------------+----------------------------------+
| 2020-01-29 | "{keys : {1 : 'a', 2 : 'b' }}" | [Map(1 ->'a',2 ->'b')] |
+--------------------+-------------------------------------+----------------------------------+
1 - 如何从这个 MapType
对象中提取密钥?我知道我需要使用 explode
函数来达到我想要的 table 格式,但我仍然不知道如何将 JSON 对象的键提取为数组格式。
I'm open to other approaches if it's easier to reach my goal.
基于您目前所做的,您可以获得以下密钥:
from pyspark.sql import functions as f
df = (df
.withColumn("map_json_column", f.from_json("json_string_colum",schema))
.withColumn("keys", f.map_keys("map_json_column.keys"))
.drop("map_json_column")
.withColumn("keys", f.explode("keys"))
)
结果:
+-------------+--------------------+----+
|creation_date| json_string_colum|keys|
+-------------+--------------------+----+
| 2020-01-29|{"keys" : {"1" : ...| 1|
| 2020-01-29|{"keys" : {"1" : ...| 2|
+-------------+--------------------+----+
以下是获得上述答案的详细步骤:
>>> from pyspark.sql import functions as f
>>> df.show()
+-------------+--------------------+
|creation_date| json_string_colum|
+-------------+--------------------+
| 2020-01-29|{"keys" : {"1" : ...|
+-------------+--------------------+
>>> df.withColumn("map_json_column", f.from_json("json_string_colum",schema)).show()
+-------------+--------------------+------------------+
|creation_date| json_string_colum| map_json_column|
+-------------+--------------------+------------------+
| 2020-01-29|{"keys" : {"1" : ...|[[1 -> a, 2 -> b]]|
+-------------+--------------------+------------------+
>>> df.withColumn("map_json_column", f.from_json("json_string_colum",schema)).withColumn("keys", f.map_keys("map_json_column.keys")).show()
+-------------+--------------------+------------------+------+
|creation_date| json_string_colum| map_json_column| keys|
+-------------+--------------------+------------------+------+
| 2020-01-29|{"keys" : {"1" : ...|[[1 -> a, 2 -> b]]|[1, 2]|
+-------------+--------------------+------------------+------+
>>> df.withColumn("map_json_column", f.from_json("json_string_colum",schema)).withColumn("keys", f.map_keys("map_json_column.keys")).drop("map_json_column").show()
+-------------+--------------------+------+
|creation_date| json_string_colum| keys|
+-------------+--------------------+------+
| 2020-01-29|{"keys" : {"1" : ...|[1, 2]|
+-------------+--------------------+------+
>>> df.withColumn("map_json_column", f.from_json("json_string_colum",schema)).withColumn("keys", f.map_keys("map_json_column.keys")).drop("map_json_column").withColumn("keys", f.explode("keys")).show()
+-------------+--------------------+----+
|creation_date| json_string_colum|keys|
+-------------+--------------------+----+
| 2020-01-29|{"keys" : {"1" : ...| 1|
| 2020-01-29|{"keys" : {"1" : ...| 2|
+-------------+--------------------+----+
需要说明的是,我在上面使用的函数 map_keys 在 PySpark 2.3+
中可用
我有 Hadoop table 调用 table_with_json_string
例如:
+-----------------------------------+---------------------------------+
| creation_date | json_string_colum |
+-----------------------------------+---------------------------------+
| 2020-01-29 | "{keys : {1 : 'a', 2 : 'b' }}" |
+-----------------------------------+---------------------------------+
期望输出:
+-----------------------------------+----------------------------------+----------+
| creation_date | json_string_colum | keys |
+-----------------------------------+----------------------------------+----------+
| 2020-01-29 | "{keys : {1 : 'a', 2 : 'b' }}" | 1 |
| 2020-01-29 | "{keys : {1 : 'a', 2 : 'b' }}" | 2 |
+-----------------------------------+----------------------------------+----------+
我试过:
from pyspark.sql import functions as sf
from pyspark.sql import types as st
from pyspark.sql.functions import from_json, col,explode
from pyspark.sql.types import StructType, StructField, StringType,MapType
schema = StructType([StructField("keys",
MapType(StringType(),StringType()),True)])
df = spark.table('table_with_json_string').select(col("creation_date"),col("json_string_colum"))
df = df.withColumn("map_json_column", from_json("json_string_colum",schema))
df.show(1,False)
+--------------------+-------------------------------------+----------------------------------+
| creation_date| json_string_colum | map_json_column |
+--------------------+-------------------------------------+----------------------------------+
| 2020-01-29 | "{keys : {1 : 'a', 2 : 'b' }}" | [Map(1 ->'a',2 ->'b')] |
+--------------------+-------------------------------------+----------------------------------+
1 - 如何从这个 MapType
对象中提取密钥?我知道我需要使用 explode
函数来达到我想要的 table 格式,但我仍然不知道如何将 JSON 对象的键提取为数组格式。
I'm open to other approaches if it's easier to reach my goal.
基于您目前所做的,您可以获得以下密钥:
from pyspark.sql import functions as f
df = (df
.withColumn("map_json_column", f.from_json("json_string_colum",schema))
.withColumn("keys", f.map_keys("map_json_column.keys"))
.drop("map_json_column")
.withColumn("keys", f.explode("keys"))
)
结果:
+-------------+--------------------+----+
|creation_date| json_string_colum|keys|
+-------------+--------------------+----+
| 2020-01-29|{"keys" : {"1" : ...| 1|
| 2020-01-29|{"keys" : {"1" : ...| 2|
+-------------+--------------------+----+
以下是获得上述答案的详细步骤:
>>> from pyspark.sql import functions as f
>>> df.show()
+-------------+--------------------+
|creation_date| json_string_colum|
+-------------+--------------------+
| 2020-01-29|{"keys" : {"1" : ...|
+-------------+--------------------+
>>> df.withColumn("map_json_column", f.from_json("json_string_colum",schema)).show()
+-------------+--------------------+------------------+
|creation_date| json_string_colum| map_json_column|
+-------------+--------------------+------------------+
| 2020-01-29|{"keys" : {"1" : ...|[[1 -> a, 2 -> b]]|
+-------------+--------------------+------------------+
>>> df.withColumn("map_json_column", f.from_json("json_string_colum",schema)).withColumn("keys", f.map_keys("map_json_column.keys")).show()
+-------------+--------------------+------------------+------+
|creation_date| json_string_colum| map_json_column| keys|
+-------------+--------------------+------------------+------+
| 2020-01-29|{"keys" : {"1" : ...|[[1 -> a, 2 -> b]]|[1, 2]|
+-------------+--------------------+------------------+------+
>>> df.withColumn("map_json_column", f.from_json("json_string_colum",schema)).withColumn("keys", f.map_keys("map_json_column.keys")).drop("map_json_column").show()
+-------------+--------------------+------+
|creation_date| json_string_colum| keys|
+-------------+--------------------+------+
| 2020-01-29|{"keys" : {"1" : ...|[1, 2]|
+-------------+--------------------+------+
>>> df.withColumn("map_json_column", f.from_json("json_string_colum",schema)).withColumn("keys", f.map_keys("map_json_column.keys")).drop("map_json_column").withColumn("keys", f.explode("keys")).show()
+-------------+--------------------+----+
|creation_date| json_string_colum|keys|
+-------------+--------------------+----+
| 2020-01-29|{"keys" : {"1" : ...| 1|
| 2020-01-29|{"keys" : {"1" : ...| 2|
+-------------+--------------------+----+
需要说明的是,我在上面使用的函数 map_keys 在 PySpark 2.3+
中可用