在 Spark 结构化流中从数组 JSON 中解释数据帧
Interpreting dataframe out of Array JSON in Spark Structured Streaming
我的数据框中有一列低于 json 的字符串,我怎样才能 explode/flatten 它获得单级数据框?
目前架构是
df
|-json_data (StringType)
如何获得以下架构的 df?
df
|-key1
|-key2_signal
|-key2_value
[{
"key1": 1647336730000,
"key2": [
{
"signal": "signal_key_1",
"value": 73.6
},
{
"signal": "signal_key_2",
"value": 3.375
},
{
"signal": "signal_key_3",
"value": 13.82
}]
}]
我想你应该这样做。
说明:
- 创建用于对示例数据进行练习的虚拟对象。 (df1 创建)
- 获取字符串列并将其转换为实际的 JSON。
- Select 所有字段。
- 分解 key2 因为它是一个列表。
- select key2
中的所有相关键
- selectkey1和key2相关数据。使用星号 (*)
import pyspark.sql.functions as f
from pyspark.sql.types import *
import json
df1 = spark.createDataFrame(
[{'json_data': [json.dumps({
"key1": 1647336730000,
"key2": [
{
"signal": "signal_key_1",
"value": 73.6
},
{
"signal": "signal_key_2",
"value": 3.375
},
{
"signal": "signal_key_3",
"value": 13.82
}]
})]}],
schema=StructType([StructField('json_data', StringType(), True)])
)
(
df1
.withColumn('actual', f.from_json(f.col('json_data'), f.schema_of_json(f.lit(df1.select(f.col("json_data")).first()[0]))))
.withColumn('data', f.explode('actual'))
.drop('actual')
.withColumn('key2', f.explode('data.key2'))
.drop('json_data')
.select('data.key1', 'key2.*').show()
)
# +-------------+------------+-----+
# | key1| signal|value|
# +-------------+------------+-----+
# |1647336730000|signal_key_1| 73.6|
# |1647336730000|signal_key_2|3.375|
# |1647336730000|signal_key_3|13.82|
# +-------------+------------+-----+
我的数据框中有一列低于 json 的字符串,我怎样才能 explode/flatten 它获得单级数据框? 目前架构是
df
|-json_data (StringType)
如何获得以下架构的 df?
df
|-key1
|-key2_signal
|-key2_value
[{
"key1": 1647336730000,
"key2": [
{
"signal": "signal_key_1",
"value": 73.6
},
{
"signal": "signal_key_2",
"value": 3.375
},
{
"signal": "signal_key_3",
"value": 13.82
}]
}]
我想你应该这样做。
说明:
- 创建用于对示例数据进行练习的虚拟对象。 (df1 创建)
- 获取字符串列并将其转换为实际的 JSON。
- Select 所有字段。
- 分解 key2 因为它是一个列表。
- select key2 中的所有相关键
- selectkey1和key2相关数据。使用星号 (*)
import pyspark.sql.functions as f
from pyspark.sql.types import *
import json
df1 = spark.createDataFrame(
[{'json_data': [json.dumps({
"key1": 1647336730000,
"key2": [
{
"signal": "signal_key_1",
"value": 73.6
},
{
"signal": "signal_key_2",
"value": 3.375
},
{
"signal": "signal_key_3",
"value": 13.82
}]
})]}],
schema=StructType([StructField('json_data', StringType(), True)])
)
(
df1
.withColumn('actual', f.from_json(f.col('json_data'), f.schema_of_json(f.lit(df1.select(f.col("json_data")).first()[0]))))
.withColumn('data', f.explode('actual'))
.drop('actual')
.withColumn('key2', f.explode('data.key2'))
.drop('json_data')
.select('data.key1', 'key2.*').show()
)
# +-------------+------------+-----+
# | key1| signal|value|
# +-------------+------------+-----+
# |1647336730000|signal_key_1| 73.6|
# |1647336730000|signal_key_2|3.375|
# |1647336730000|signal_key_3|13.82|
# +-------------+------------+-----+