Spark:如何使用动态嵌套数组转置和分解列
Spark: How to transpose and explode columns with dynamic nested arrays
我应用了问题 中的算法来转置和分解带有动态数组的嵌套 spark 数据帧。
我已将新列 c
添加到数据框 """{"id":3,"c":[{"date":3,"val":3, "val_dynamic":3}]}}"""
,其中数组有新的 val_dynamic
字段,可以随机出现。
我正在寻找所需的输出 2(转置和分解),但即使是所需输出 1(转置)的示例也将非常有用。
输入 df:
+------------------+--------+-----------+---+
| a| b| c| id|
+------------------+--------+-----------+---+
|[{1, 1}, {11, 11}]| null| null| 1|
| null|[{2, 2}]| null| 2|
| null| null|[{3, 3, 3}]| 3| !!! NOTE: Added `val_dynamic`
+------------------+--------+-----------+---+
root
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- c: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
| | |-- val_dynamic: long (nullable = true) !!! NOTE: Added `val_dynamic`
|-- id: long (nullable = true)
需要输出 1 (transpose_df):
+---+------+-------------------+
| id| cols | arrays |
+---+------+-------------------+
| 1| a | [{1, 1}, {11, 11}]|
| 2| b | [{2, 2}] |
| 3| c | [{3, 3, 3}] | !!! NOTE: Added `val_dynamic`
+---+------+-------------------+
需要输出 2 (explode_df):
+---+----+----+---+-----------+
| id|cols|date|val|val_dynamic|
+---+----+----+---+-----------+
| 1| a| 1| 1| null |
| 1| a| 11| 11| null |
| 2| b| 2| 2| null |
| 3| c| 3| 3| 3 | !!! NOTE: Added `val_dynamic`
+---+----+----+---+-----------+
当前代码:
import pyspark.sql.functions as f
df = spark.read.json(sc.parallelize([
"""{"id":1,"a":[{"date":1,"val":1},{"date":11,"val":11}]}""",
"""{"id":2,"b":[{"date":2,"val":2}]}}""",
"""{"id":3,"c":[{"date":3,"val":3, "val_dynamic":3}]}}"""
]))
df.show()
cols = [ 'a', 'b', 'c']
#expr = stack(2,'a',a,'b',b,'c',c )
expr = f"stack({len(cols)}," + \
",".join([f"'{c}',{c}" for c in cols]) + \
")"
transpose_df = df.selectExpr("id", expr) \
.withColumnRenamed("col0", "cols") \
.withColumnRenamed("col1", "arrays") \
.filter("not arrays is null")
transpose_df.show()
explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')
explode_df.show()
当前结果
AnalysisException: cannot resolve 'stack(3, 'a', `a`, 'b', `b`, 'c', `c`)' due to data type mismatch: Argument 2 (array<struct<date:bigint,val:bigint>>) != Argument 6 (array<struct<date:bigint,val:bigint,val_dynamic:bigint>>); line 1 pos 0;
'Project [id#2304L, unresolvedalias(stack(3, a, a#2301, b, b#2302, c, c#2303), Some(org.apache.spark.sql.Column$$Lambda80/0x00000008411d3040@4d9eefd0))]
+- LogicalRDD [a#2301, b#2302, c#2303, id#2304L], false
参考:
stack
要求所有堆积的列都具有相同的类型。这里的问题是数组内部的结构有不同的成员。一种方法是将缺少的成员添加到所有结构中,以便我的 的方法再次起作用。
cols = ['a', 'b', 'c']
#create a map containing all struct fields per column
existing_fields = {c:list(map(lambda field: field.name, df.schema.fields[i].dataType.elementType.fields))
for i,c in enumerate(df.columns) if c in cols}
#get a (unique) set of all fields that exist in all columns
all_fields = set(sum(existing_fields.values(),[]))
#create a list of transform expressions to fill up the structs will null fields
transform_exprs = [f"transform({c}, e -> named_struct(" +
",".join([f"'{f}', {('e.'+f) if f in existing_fields[c] else 'cast(null as long)'}" for f in all_fields])
+ f")) as {c}" for c in cols]
#create a df where all columns contain arrays with the same struct
full_struct_df = df.selectExpr("id", *transform_exprs)
full_struct_df
现在有架构
root
|-- id: long (nullable = true)
|-- a: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- val: long (nullable = true)
| | |-- val_dynamic: long (nullable = true)
| | |-- date: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- val: long (nullable = true)
| | |-- val_dynamic: long (nullable = true)
| | |-- date: long (nullable = true)
|-- c: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- val: long (nullable = true)
| | |-- val_dynamic: long (nullable = true)
| | |-- date: long (nullable = true)
从这里开始,逻辑像以前一样工作:
stack_expr = f"stack({len(cols)}," + \
",".join([f"'{c}',{c}" for c in cols]) + \
")"
transpose_df = full_struct_df.selectExpr("id", stack_expr) \
.withColumnRenamed("col0", "cols") \
.withColumnRenamed("col1", "arrays") \
.filter("not arrays is null")
explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')
此答案的第一部分要求
cols
中提到的每一列都是一个结构数组
- 所有结构的所有成员都是
long
。此限制的原因是创建转换表达式时 cast(null as long)
。
我应用了问题
我已将新列 c
添加到数据框 """{"id":3,"c":[{"date":3,"val":3, "val_dynamic":3}]}}"""
,其中数组有新的 val_dynamic
字段,可以随机出现。
我正在寻找所需的输出 2(转置和分解),但即使是所需输出 1(转置)的示例也将非常有用。
输入 df:
+------------------+--------+-----------+---+
| a| b| c| id|
+------------------+--------+-----------+---+
|[{1, 1}, {11, 11}]| null| null| 1|
| null|[{2, 2}]| null| 2|
| null| null|[{3, 3, 3}]| 3| !!! NOTE: Added `val_dynamic`
+------------------+--------+-----------+---+
root
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- c: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
| | |-- val_dynamic: long (nullable = true) !!! NOTE: Added `val_dynamic`
|-- id: long (nullable = true)
需要输出 1 (transpose_df):
+---+------+-------------------+
| id| cols | arrays |
+---+------+-------------------+
| 1| a | [{1, 1}, {11, 11}]|
| 2| b | [{2, 2}] |
| 3| c | [{3, 3, 3}] | !!! NOTE: Added `val_dynamic`
+---+------+-------------------+
需要输出 2 (explode_df):
+---+----+----+---+-----------+
| id|cols|date|val|val_dynamic|
+---+----+----+---+-----------+
| 1| a| 1| 1| null |
| 1| a| 11| 11| null |
| 2| b| 2| 2| null |
| 3| c| 3| 3| 3 | !!! NOTE: Added `val_dynamic`
+---+----+----+---+-----------+
当前代码:
import pyspark.sql.functions as f
df = spark.read.json(sc.parallelize([
"""{"id":1,"a":[{"date":1,"val":1},{"date":11,"val":11}]}""",
"""{"id":2,"b":[{"date":2,"val":2}]}}""",
"""{"id":3,"c":[{"date":3,"val":3, "val_dynamic":3}]}}"""
]))
df.show()
cols = [ 'a', 'b', 'c']
#expr = stack(2,'a',a,'b',b,'c',c )
expr = f"stack({len(cols)}," + \
",".join([f"'{c}',{c}" for c in cols]) + \
")"
transpose_df = df.selectExpr("id", expr) \
.withColumnRenamed("col0", "cols") \
.withColumnRenamed("col1", "arrays") \
.filter("not arrays is null")
transpose_df.show()
explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')
explode_df.show()
当前结果
AnalysisException: cannot resolve 'stack(3, 'a', `a`, 'b', `b`, 'c', `c`)' due to data type mismatch: Argument 2 (array<struct<date:bigint,val:bigint>>) != Argument 6 (array<struct<date:bigint,val:bigint,val_dynamic:bigint>>); line 1 pos 0;
'Project [id#2304L, unresolvedalias(stack(3, a, a#2301, b, b#2302, c, c#2303), Some(org.apache.spark.sql.Column$$Lambda80/0x00000008411d3040@4d9eefd0))]
+- LogicalRDD [a#2301, b#2302, c#2303, id#2304L], false
参考:
stack
要求所有堆积的列都具有相同的类型。这里的问题是数组内部的结构有不同的成员。一种方法是将缺少的成员添加到所有结构中,以便我的
cols = ['a', 'b', 'c']
#create a map containing all struct fields per column
existing_fields = {c:list(map(lambda field: field.name, df.schema.fields[i].dataType.elementType.fields))
for i,c in enumerate(df.columns) if c in cols}
#get a (unique) set of all fields that exist in all columns
all_fields = set(sum(existing_fields.values(),[]))
#create a list of transform expressions to fill up the structs will null fields
transform_exprs = [f"transform({c}, e -> named_struct(" +
",".join([f"'{f}', {('e.'+f) if f in existing_fields[c] else 'cast(null as long)'}" for f in all_fields])
+ f")) as {c}" for c in cols]
#create a df where all columns contain arrays with the same struct
full_struct_df = df.selectExpr("id", *transform_exprs)
full_struct_df
现在有架构
root
|-- id: long (nullable = true)
|-- a: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- val: long (nullable = true)
| | |-- val_dynamic: long (nullable = true)
| | |-- date: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- val: long (nullable = true)
| | |-- val_dynamic: long (nullable = true)
| | |-- date: long (nullable = true)
|-- c: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- val: long (nullable = true)
| | |-- val_dynamic: long (nullable = true)
| | |-- date: long (nullable = true)
从这里开始,逻辑像以前一样工作:
stack_expr = f"stack({len(cols)}," + \
",".join([f"'{c}',{c}" for c in cols]) + \
")"
transpose_df = full_struct_df.selectExpr("id", stack_expr) \
.withColumnRenamed("col0", "cols") \
.withColumnRenamed("col1", "arrays") \
.filter("not arrays is null")
explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')
此答案的第一部分要求
cols
中提到的每一列都是一个结构数组- 所有结构的所有成员都是
long
。此限制的原因是创建转换表达式时cast(null as long)
。