PySpark 将 JSON 字符串分解为多列
PySpark Explode JSON String into Multiple Columns
我有一个包含一列字符串数据类型的数据框。该字符串表示一个 api 请求 return 一个 json.
df = spark.createDataFrame([
("[{original={ranking=1.0, input=top3}, response=[{to=Sam, position=guard}, {to=John, position=center}, {to=Andrew, position=forward}]}]",1)],
"col1:string, col2:int")
df.show()
生成的数据框如下:
+--------------------+----+
| col1|col2|
+--------------------+----+
|[{original={ranki...| 1|
+--------------------+----+
我希望有 col2 的输出,并有来自响应的另外两列。 Col3 将捕获玩家名称,由 to= 指示,而 col 4 将其位置由 position= 指示。以及数据框现在将有三行,因为有三个玩家。示例:
+----+------+-------+
|col2| col3| col4|
+----+------+-------+
| 1| Sam| guard|
| 1| John| center|
| 1|Andrew|forward|
+----+------+-------+
我读到我可以利用类似的东西:
df.withColumn("col3",explode(from_json("col1")))
但是,我不确定如何展开,因为我想要两列而不是一列并且需要架构。
注意,我可以使用 json_dumps 将响应修改为 return 只有字符串的响应部分或...
[{to=Sam, position=guard}, {to=John, position=center}, {to=Andrew, position=forward}]}]
如果像上面提到的那样简化输出,您可以定义一个简单的 JSON 模式并将 JSON 字符串转换为 StructType
并读取每个字段
输入
df = spark.createDataFrame([("[{'to': 'Sam', 'position': 'guard'},{'to': 'John', 'position': 'center'},{'to': 'Andrew', 'position': 'forward'}]",1)], "col1:string, col2:int")
# +-----------------------------------------------------------------------------------------------------------------+----+
# |col1 |col2|
# +-----------------------------------------------------------------------------------------------------------------+----+
# |[{'to': 'Sam', 'position': 'guard'},{'to': 'John', 'position': 'center'},{'to': 'Andrew', 'position': 'forward'}]|1 |
# +-----------------------------------------------------------------------------------------------------------------+----+
这就是转变
from pyspark.sql import functions as F
from pyspark.sql import types as T
schema = T.ArrayType(T.StructType([
T.StructField('to', T.StringType()),
T.StructField('position', T.StringType())
]))
(df
.withColumn('temp', F.explode(F.from_json('col1', schema=schema)))
.select(
F.col('col2'),
F.col('temp.to').alias('col3'),
F.col('temp.position').alias('col4'),
)
.show()
)
# Output
# +----+------+-------+
# |col2| col3| col4|
# +----+------+-------+
# | 1| Sam| guard|
# | 1| John| center|
# | 1|Andrew|forward|
# +----+------+-------+
我有一个包含一列字符串数据类型的数据框。该字符串表示一个 api 请求 return 一个 json.
df = spark.createDataFrame([
("[{original={ranking=1.0, input=top3}, response=[{to=Sam, position=guard}, {to=John, position=center}, {to=Andrew, position=forward}]}]",1)],
"col1:string, col2:int")
df.show()
生成的数据框如下:
+--------------------+----+
| col1|col2|
+--------------------+----+
|[{original={ranki...| 1|
+--------------------+----+
我希望有 col2 的输出,并有来自响应的另外两列。 Col3 将捕获玩家名称,由 to= 指示,而 col 4 将其位置由 position= 指示。以及数据框现在将有三行,因为有三个玩家。示例:
+----+------+-------+
|col2| col3| col4|
+----+------+-------+
| 1| Sam| guard|
| 1| John| center|
| 1|Andrew|forward|
+----+------+-------+
我读到我可以利用类似的东西:
df.withColumn("col3",explode(from_json("col1")))
但是,我不确定如何展开,因为我想要两列而不是一列并且需要架构。
注意,我可以使用 json_dumps 将响应修改为 return 只有字符串的响应部分或...
[{to=Sam, position=guard}, {to=John, position=center}, {to=Andrew, position=forward}]}]
如果像上面提到的那样简化输出,您可以定义一个简单的 JSON 模式并将 JSON 字符串转换为 StructType
并读取每个字段
输入
df = spark.createDataFrame([("[{'to': 'Sam', 'position': 'guard'},{'to': 'John', 'position': 'center'},{'to': 'Andrew', 'position': 'forward'}]",1)], "col1:string, col2:int")
# +-----------------------------------------------------------------------------------------------------------------+----+
# |col1 |col2|
# +-----------------------------------------------------------------------------------------------------------------+----+
# |[{'to': 'Sam', 'position': 'guard'},{'to': 'John', 'position': 'center'},{'to': 'Andrew', 'position': 'forward'}]|1 |
# +-----------------------------------------------------------------------------------------------------------------+----+
这就是转变
from pyspark.sql import functions as F
from pyspark.sql import types as T
schema = T.ArrayType(T.StructType([
T.StructField('to', T.StringType()),
T.StructField('position', T.StringType())
]))
(df
.withColumn('temp', F.explode(F.from_json('col1', schema=schema)))
.select(
F.col('col2'),
F.col('temp.to').alias('col3'),
F.col('temp.position').alias('col4'),
)
.show()
)
# Output
# +----+------+-------+
# |col2| col3| col4|
# +----+------+-------+
# | 1| Sam| guard|
# | 1| John| center|
# | 1|Andrew|forward|
# +----+------+-------+