Spark:如何使用动态嵌套数组转置和分解列

Spark: How to transpose and explode columns with dynamic nested arrays

我应用了问题 中的算法来转置和分解带有动态数组的嵌套 spark 数据帧。

我已将新列 c 添加到数据框 """{"id":3,"c":[{"date":3,"val":3, "val_dynamic":3}]}}""",其中数组有新的 val_dynamic 字段,可以随机出现。

我正在寻找所需的输出 2(转置和分解),但即使是所需输出 1(转置)的示例也将非常有用。

输入 df:

+------------------+--------+-----------+---+
|                 a|       b|          c| id|
+------------------+--------+-----------+---+
|[{1, 1}, {11, 11}]|    null|       null|  1|
|              null|[{2, 2}]|       null|  2|
|              null|    null|[{3, 3, 3}]|  3|   !!! NOTE: Added `val_dynamic`
+------------------+--------+-----------+---+


root
 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |    |    |-- val_dynamic: long (nullable = true)  !!! NOTE: Added `val_dynamic`
 |-- id: long (nullable = true)

需要输出 1 (transpose_df):

+---+------+-------------------+
| id| cols |       arrays      |
+---+------+-------------------+
|  1|  a   | [{1, 1}, {11, 11}]|
|  2|  b   | [{2, 2}]          |
|  3|  c   | [{3, 3, 3}]       | !!! NOTE: Added `val_dynamic`
+---+------+-------------------+

需要输出 2 (explode_df):

+---+----+----+---+-----------+
| id|cols|date|val|val_dynamic|
+---+----+----+---+-----------+
|  1|   a|   1|  1|   null    |
|  1|   a|  11| 11|   null    |
|  2|   b|   2|  2|   null    |
|  3|   c|   3|  3|      3    |  !!! NOTE: Added `val_dynamic`
+---+----+----+---+-----------+

当前代码:

import pyspark.sql.functions as f

df = spark.read.json(sc.parallelize([
  """{"id":1,"a":[{"date":1,"val":1},{"date":11,"val":11}]}""",
  """{"id":2,"b":[{"date":2,"val":2}]}}""",
  """{"id":3,"c":[{"date":3,"val":3, "val_dynamic":3}]}}"""
    ]))

df.show()

cols = [ 'a', 'b', 'c']

#expr = stack(2,'a',a,'b',b,'c',c )
expr = f"stack({len(cols)}," + \
    ",".join([f"'{c}',{c}" for c in cols]) + \
    ")"


transpose_df = df.selectExpr("id", expr) \
    .withColumnRenamed("col0", "cols") \
    .withColumnRenamed("col1", "arrays") \
    .filter("not arrays is null")

transpose_df.show()

explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')
explode_df.show()

当前结果

AnalysisException: cannot resolve 'stack(3, 'a', `a`, 'b', `b`, 'c', `c`)' due to data type mismatch: Argument 2 (array<struct<date:bigint,val:bigint>>) != Argument 6 (array<struct<date:bigint,val:bigint,val_dynamic:bigint>>); line 1 pos 0;
'Project [id#2304L, unresolvedalias(stack(3, a, a#2301, b, b#2302, c, c#2303), Some(org.apache.spark.sql.Column$$Lambda80/0x00000008411d3040@4d9eefd0))]
+- LogicalRDD [a#2301, b#2302, c#2303, id#2304L], false

参考:

stack 要求所有堆积的列都具有相同的类型。这里的问题是数组内部的结构有不同的成员。一种方法是将缺少的成员添加到所有结构中,以便我的 的方法再次起作用。

cols = ['a', 'b', 'c']

#create a map containing all struct fields per column
existing_fields = {c:list(map(lambda field: field.name, df.schema.fields[i].dataType.elementType.fields)) 
      for i,c in enumerate(df.columns) if c in cols}

#get a (unique) set of all fields that exist in all columns
all_fields = set(sum(existing_fields.values(),[]))

#create a list of transform expressions to fill up the structs will null fields
transform_exprs = [f"transform({c}, e -> named_struct(" + 
    ",".join([f"'{f}', {('e.'+f) if f in existing_fields[c] else 'cast(null as long)'}" for f in all_fields]) 
    + f")) as {c}" for c in cols]

#create a df where all columns contain arrays with the same struct
full_struct_df = df.selectExpr("id", *transform_exprs)

full_struct_df 现在有架构

root
 |-- id: long (nullable = true)
 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- val: long (nullable = true)
 |    |    |-- val_dynamic: long (nullable = true)
 |    |    |-- date: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- val: long (nullable = true)
 |    |    |-- val_dynamic: long (nullable = true)
 |    |    |-- date: long (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- val: long (nullable = true)
 |    |    |-- val_dynamic: long (nullable = true)
 |    |    |-- date: long (nullable = true)

从这里开始,逻辑像以前一样工作:

stack_expr = f"stack({len(cols)}," + \
    ",".join([f"'{c}',{c}" for c in cols]) + \
    ")"

transpose_df = full_struct_df.selectExpr("id", stack_expr) \
    .withColumnRenamed("col0", "cols") \
    .withColumnRenamed("col1", "arrays") \
    .filter("not arrays is null")

explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')

此答案的第一部分要求

  • cols中提到的每一列都是一个结构数组
  • 所有结构的所有成员都是long。此限制的原因是创建转换表达式时 cast(null as long)