Spark:如何使用嵌套数组转置和分解列
Spark: How to transpose and explode columns with nested arrays
我应用了下面问题中的算法(在注释中)来转置和分解嵌套的 spark 数据帧。
当我定义 cols = ['a', 'b']
时,我得到空数据框,但是当我定义 cols = ['a']
时,我得到 a
列的转换数据框。有关详细信息,请参阅下面的当前代码部分。任何帮助将不胜感激。
我正在寻找所需的输出 2(转置和分解),但即使是所需输出 1(转置)的示例也将非常有用。
注意:这是突出问题的最小示例,实际上数据帧架构和数组长度与示例中的不同
输入 df:
+---+------------------+--------+
| id| a| b|
+---+------------------+--------+
| 1|[{1, 1}, {11, 11}]| null|
| 2| null|[{2, 2}]|
+---+------------------+--------+
root
|-- id: long (nullable = true)
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true
需要输出 1 (transpose_df):
+---+------+-------------------+
| id| cols | arrays |
+---+------+-------------------+
| 1| a | [{1, 1}, {11, 11}]|
| 2| b | [{2, 2}] |
+---+------+-------------------+
需要输出 2 (explode_df):
+---+----+----+---+
| id|cols|date|val|
+---+----+----+---+
| 1| a| 1| 1|
| 1| a| 11| 11|
| 2| b| 2| 2|
+---+----+----+---+
当前代码:
import pyspark.sql.functions as f
df = spark.read.json(sc.parallelize([
"""{"id":1,"a":[{"date":1,"val":1},{"date":11,"val":11}]}""",
"""{"id":2,"b":[{"date":2,"val":2}]}}"""]))
cols = ['a', 'b']
expressions = [f.expr('TRANSFORM({col}, el -> STRUCT("{col}" AS cols, el.date, el.val))'.format(col=col)) for col in cols ]
transpose_df = df.withColumn('arrays', f.flatten(f.array(*expressions)))
explode_df = transpose_df.selectExpr('id', 'inline(arrays)')
explode_df.show()
当前结果
+---+----+----+---+
| id|cols|date|val|
+---+----+----+---+
+---+----+----+---+
对于第一步,stack 可能比 transpose
更好。
expr = f"stack({len(cols)}," + \
",".join([f"'{c}',{c}" for c in cols]) + \
")"
#expr = stack(2,'a',a,'b',b)
transpose_df = df.selectExpr("id", expr) \
.withColumnRenamed("col0", "cols") \
.withColumnRenamed("col1", "arrays") \
.filter("not arrays is null")
explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')
我应用了下面问题中的算法(在注释中)来转置和分解嵌套的 spark 数据帧。
当我定义 cols = ['a', 'b']
时,我得到空数据框,但是当我定义 cols = ['a']
时,我得到 a
列的转换数据框。有关详细信息,请参阅下面的当前代码部分。任何帮助将不胜感激。
我正在寻找所需的输出 2(转置和分解),但即使是所需输出 1(转置)的示例也将非常有用。
注意:这是突出问题的最小示例,实际上数据帧架构和数组长度与示例中的不同
输入 df:
+---+------------------+--------+
| id| a| b|
+---+------------------+--------+
| 1|[{1, 1}, {11, 11}]| null|
| 2| null|[{2, 2}]|
+---+------------------+--------+
root
|-- id: long (nullable = true)
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true
需要输出 1 (transpose_df):
+---+------+-------------------+
| id| cols | arrays |
+---+------+-------------------+
| 1| a | [{1, 1}, {11, 11}]|
| 2| b | [{2, 2}] |
+---+------+-------------------+
需要输出 2 (explode_df):
+---+----+----+---+
| id|cols|date|val|
+---+----+----+---+
| 1| a| 1| 1|
| 1| a| 11| 11|
| 2| b| 2| 2|
+---+----+----+---+
当前代码:
import pyspark.sql.functions as f
df = spark.read.json(sc.parallelize([
"""{"id":1,"a":[{"date":1,"val":1},{"date":11,"val":11}]}""",
"""{"id":2,"b":[{"date":2,"val":2}]}}"""]))
cols = ['a', 'b']
expressions = [f.expr('TRANSFORM({col}, el -> STRUCT("{col}" AS cols, el.date, el.val))'.format(col=col)) for col in cols ]
transpose_df = df.withColumn('arrays', f.flatten(f.array(*expressions)))
explode_df = transpose_df.selectExpr('id', 'inline(arrays)')
explode_df.show()
当前结果
+---+----+----+---+
| id|cols|date|val|
+---+----+----+---+
+---+----+----+---+
stack 可能比 transpose
更好。
expr = f"stack({len(cols)}," + \
",".join([f"'{c}',{c}" for c in cols]) + \
")"
#expr = stack(2,'a',a,'b',b)
transpose_df = df.selectExpr("id", expr) \
.withColumnRenamed("col0", "cols") \
.withColumnRenamed("col1", "arrays") \
.filter("not arrays is null")
explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')