Spark:如何使用嵌套数组转置和分解列

Spark: How to transpose and explode columns with nested arrays

我应用了下面问题中的算法(在注释中)来转置和分解嵌套的 spark 数据帧。

当我定义 cols = ['a', 'b'] 时,我得到空数据框,但是当我定义 cols = ['a'] 时,我得到 a 列的转换数据框。有关详细信息,请参阅下面的当前代码部分。任何帮助将不胜感激。

我正在寻找所需的输出 2(转置和分解),但即使是所需输出 1(转置)的示例也将非常有用。

注意:这是突出问题的最小示例,实际上数据帧架构和数组长度与示例中的不同

输入 df:

+---+------------------+--------+
| id|                 a|       b|
+---+------------------+--------+
|  1|[{1, 1}, {11, 11}]|    null|
|  2|              null|[{2, 2}]|
+---+------------------+--------+


root
 |-- id: long (nullable = true)
 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true

需要输出 1 (transpose_df):

+---+------+-------------------+
| id| cols |       arrays      |
+---+------+-------------------+
|  1|  a   | [{1, 1}, {11, 11}]|
|  2|  b   | [{2, 2}]          |
+---+------+-------------------+

需要输出 2 (explode_df):

+---+----+----+---+
| id|cols|date|val|
+---+----+----+---+
|  1|   a|   1|  1|
|  1|   a|  11| 11|
|  2|   b|   2|  2|
+---+----+----+---+

当前代码:

import pyspark.sql.functions as f

df = spark.read.json(sc.parallelize([
  """{"id":1,"a":[{"date":1,"val":1},{"date":11,"val":11}]}""",
  """{"id":2,"b":[{"date":2,"val":2}]}}"""]))

cols = ['a', 'b']

expressions = [f.expr('TRANSFORM({col}, el -> STRUCT("{col}" AS cols, el.date, el.val))'.format(col=col)) for col in cols ]

transpose_df = df.withColumn('arrays', f.flatten(f.array(*expressions)))
             
explode_df = transpose_df.selectExpr('id', 'inline(arrays)')

explode_df.show()

当前结果

+---+----+----+---+
| id|cols|date|val|
+---+----+----+---+
+---+----+----+---+
对于第一步,

stack 可能比 transpose 更好。


expr = f"stack({len(cols)}," + \
    ",".join([f"'{c}',{c}" for c in cols]) + \
    ")"
#expr = stack(2,'a',a,'b',b)

transpose_df = df.selectExpr("id", expr) \
    .withColumnRenamed("col0", "cols") \
    .withColumnRenamed("col1", "arrays") \
    .filter("not arrays is null")

explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')