拆分成组并分解 pyspark 数组类型列
Splitting into groups and exploding pyspark array type column
我有静态列表 group_1
和 group_2
:
group_1 = [a,b,c,d,e,f,g]
group_2 = [h,i,j,k]
我有 pyspark 数据框 df1
,如下所示。
示例 1:
df1:
+-----+----------------------------------------+-----------------------------------------+
|id |array1 |array2 |
+-----+----------------------------------------+-----------------------------------------+
|id1 |[a,b,c,d,group_1,group_2] |[a,b,c,d,e,f,g,h,i,j,k] |
+-----+----------------------------------------+-----------------------------------------+
output_df:
+-----+-------------------|-------------------|
|id |col1 |col2 |
+-----+-------------------|-------------------|
|id1 |[a,b,c,d] |[a,b,c,d] |
|id1 |[e,f,g] |group_1 |
|id1 |[h,i,j,k] |group_2 |
+-----+-------------------|-------------------|
实际上,array2
列将包含来自 array1
列的元素。这就是我的源数据框 (source_df1
) 的样子。
如果我们看到 array1
列,则有单独的元素,如 (a,b,c,d)
以及 group_1
和 group_2
元素,但它们加在一起是不同的。
现在我想通过分解来创建 pyspark 数据框,使单个元素和组元素按 output_df
所示进行分类。
示例 1 观察: 如果我们看到输出数据框 output_df
,第二条记录 group_1
只有 [e,f,g]
,因为其他元素已经存在个别元素的一部分。
示例 2:
source_df1:
+-----+----------------------------------------+-----------------------------------------+
|id |array1 |array2 |
+-----+----------------------------------------+-----------------------------------------+
|id1 |[a,b,group_1,group_2] |[a,b,c,d,e,f,g,h,i,j,k] |
+-----+----------------------------------------+-----------------------------------------+
output_df:
+-----+-------------------|-------------------|
|id |col1 |col2 |
+-----+-------------------|-------------------|
|id1 |[a,b] |[a,b] |
|id1 |[c,d,e,f,g] |group_1 |
|id1 |[h,i,j,k] |group_2 |
+-----+-------------------|-------------------|
示例 2 观察: 如果我们看到输出数据帧 output_df
。第二条记录 group_1
只有 [c,d,e,f,g]
因为其他元素已经是单个元素的一部分。
任何人都可以帮助实现这个目标吗?
如果你可以使用 Spark 2.4+,你可以通过一些数组函数来实现:
from pyspark.sql import functions as F
df1 = df.withColumn(
"individual",
F.array_except(F.col("array1"), F.array(*[F.lit("group_1"), F.lit("group_2")]))
).withColumn(
"group_1",
F.array_except(F.array(*[F.lit(i) for i in group_1]), "individual")
).withColumn(
"group_2",
F.array_except(F.array(*[F.lit(i) for i in group_2]), "individual")
).withColumn(
"array2",
F.explode(F.array(
*[
F.struct(F.array_intersect("array2", "individual").alias("col1"),
F.col("individual").cast("string").alias("col2")),
F.struct(F.array_intersect("array2", "group_1").alias("col1"),
F.lit("group_1").alias("col2")),
F.struct(F.array_intersect("array2", "group_2").alias("col1"),
F.lit("group_2").alias("col2"))
])
)
).select("id", "array2.*")
df1.show(truncate=False)
#+---+------------+------------+
#|id |col1 |col2 |
#+---+------------+------------+
#|id1|[a, b, c, d]|[a, b, c, d]|
#|id1|[e, f, g] |group_1 |
#|id1|[h, i, j, k]|group_2 |
#+---+------------+------------+
说明:
- 首先,将
array1
分成三个数组:individual
、group_1
和group_2
。每一个都包含相应组的元素。 group_1
和 group_2
中存在于 individual
中的元素将从这些组中删除。
- 然后,使用
array_intersect
函数从 array2
列获取元素,这些元素存在于上面创建的三个组数组中的每一个中。
- 最后,将上面创建的新数组展开
请注意,如果您想验证 group_1
或 group_2
是否存在于 array1
列中,您可以将 when
与 array_contains
函数一起使用:
F.when(
F.array_contains(F.col("array1"), F.lit("group_1")),
F.array_except(F.array(*[F.lit(i) for i in group_1]), "individual")
)
在示例中,我假设它始终存在于 array1
。
我有静态列表 group_1
和 group_2
:
group_1 = [a,b,c,d,e,f,g]
group_2 = [h,i,j,k]
我有 pyspark 数据框 df1
,如下所示。
示例 1:
df1:
+-----+----------------------------------------+-----------------------------------------+
|id |array1 |array2 |
+-----+----------------------------------------+-----------------------------------------+
|id1 |[a,b,c,d,group_1,group_2] |[a,b,c,d,e,f,g,h,i,j,k] |
+-----+----------------------------------------+-----------------------------------------+
output_df:
+-----+-------------------|-------------------|
|id |col1 |col2 |
+-----+-------------------|-------------------|
|id1 |[a,b,c,d] |[a,b,c,d] |
|id1 |[e,f,g] |group_1 |
|id1 |[h,i,j,k] |group_2 |
+-----+-------------------|-------------------|
实际上,array2
列将包含来自 array1
列的元素。这就是我的源数据框 (source_df1
) 的样子。
如果我们看到 array1
列,则有单独的元素,如 (a,b,c,d)
以及 group_1
和 group_2
元素,但它们加在一起是不同的。
现在我想通过分解来创建 pyspark 数据框,使单个元素和组元素按 output_df
所示进行分类。
示例 1 观察: 如果我们看到输出数据框 output_df
,第二条记录 group_1
只有 [e,f,g]
,因为其他元素已经存在个别元素的一部分。
示例 2:
source_df1:
+-----+----------------------------------------+-----------------------------------------+
|id |array1 |array2 |
+-----+----------------------------------------+-----------------------------------------+
|id1 |[a,b,group_1,group_2] |[a,b,c,d,e,f,g,h,i,j,k] |
+-----+----------------------------------------+-----------------------------------------+
output_df:
+-----+-------------------|-------------------|
|id |col1 |col2 |
+-----+-------------------|-------------------|
|id1 |[a,b] |[a,b] |
|id1 |[c,d,e,f,g] |group_1 |
|id1 |[h,i,j,k] |group_2 |
+-----+-------------------|-------------------|
示例 2 观察: 如果我们看到输出数据帧 output_df
。第二条记录 group_1
只有 [c,d,e,f,g]
因为其他元素已经是单个元素的一部分。
任何人都可以帮助实现这个目标吗?
如果你可以使用 Spark 2.4+,你可以通过一些数组函数来实现:
from pyspark.sql import functions as F
df1 = df.withColumn(
"individual",
F.array_except(F.col("array1"), F.array(*[F.lit("group_1"), F.lit("group_2")]))
).withColumn(
"group_1",
F.array_except(F.array(*[F.lit(i) for i in group_1]), "individual")
).withColumn(
"group_2",
F.array_except(F.array(*[F.lit(i) for i in group_2]), "individual")
).withColumn(
"array2",
F.explode(F.array(
*[
F.struct(F.array_intersect("array2", "individual").alias("col1"),
F.col("individual").cast("string").alias("col2")),
F.struct(F.array_intersect("array2", "group_1").alias("col1"),
F.lit("group_1").alias("col2")),
F.struct(F.array_intersect("array2", "group_2").alias("col1"),
F.lit("group_2").alias("col2"))
])
)
).select("id", "array2.*")
df1.show(truncate=False)
#+---+------------+------------+
#|id |col1 |col2 |
#+---+------------+------------+
#|id1|[a, b, c, d]|[a, b, c, d]|
#|id1|[e, f, g] |group_1 |
#|id1|[h, i, j, k]|group_2 |
#+---+------------+------------+
说明:
- 首先,将
array1
分成三个数组:individual
、group_1
和group_2
。每一个都包含相应组的元素。group_1
和group_2
中存在于individual
中的元素将从这些组中删除。 - 然后,使用
array_intersect
函数从array2
列获取元素,这些元素存在于上面创建的三个组数组中的每一个中。 - 最后,将上面创建的新数组展开
请注意,如果您想验证 group_1
或 group_2
是否存在于 array1
列中,您可以将 when
与 array_contains
函数一起使用:
F.when(
F.array_contains(F.col("array1"), F.lit("group_1")),
F.array_except(F.array(*[F.lit(i) for i in group_1]), "individual")
)
在示例中,我假设它始终存在于 array1
。