比较 Pyspark 数据帧的值(列表)
Comparing the values (list) of Pyspark dataframes
我想比较 list_id 列上的两个 df1 df2 数据帧:
df1 =
+---------+
| list_id|
+---------+
|[1, 2, 3]|
|[4, 5, 6]|
|[7, 8, 9]|
+---------+
df2 =
+------------+
| list_id|
+------------+
| [10, 3, 11]|
|[12, 13, 14]|
| [15, 6, 16]|
+------------+
期望的结果是:
df2 =
+-------------------+
| list_id|
+-------------------+
| [1, 2, 3, 10, 11] |
| [4, 5, 6, 15, 16] |
| [7, 8, 9] |
| [12, 13, 14] |
+-------------------+
我的目标是连接交集不为空的列表,并使其他列表与 pyspark 保持原样。
注意:我的数据帧非常大,无法使用 Spark Sql 连接。
我想出了一个无需任何连接操作即可运行的代码。
它在某种程度上非常混乱,考虑到我多次分解数组,我不知道它将如何表现内存。
import pyspark.sql.functions as F
from pyspark.sql.window import Window
df1 = (sc.parallelize([(1, 2, 3), (4, 5, 6), (7, 8, 9)])
.toDF(('c1', 'c2', 'c3'))
.select(F.array(F.col('c1'), F.col('c2'), F.col('c3')).alias('id_list'))
)
df2 = (sc.parallelize([(10, 3, 11), (12, 13, 14), (15, 6, 16)])
.toDF(('c1', 'c2', 'c3'))
.select(F.array(F.col('c1'), F.col('c2'), F.col('c3')).alias('id_list'))
)
out = (df1.union(df2)
.withColumn('key1', F.explode('id_list'))
.withColumn('key2', F.explode('id_list'))
.groupBy('key1')
.agg(F.sort_array(F.collect_set(F.col('key2'))).alias('id_list'))
.withColumn('key1', F.explode('id_list'))
.withColumn('max_length', F.max(F.size('id_list')).over(Window().partitionBy('key1')))
.where(F.col('max_length')==F.size('id_list'))
.select('id_list')
.distinct()
)
我想比较 list_id 列上的两个 df1 df2 数据帧:
df1 =
+---------+
| list_id|
+---------+
|[1, 2, 3]|
|[4, 5, 6]|
|[7, 8, 9]|
+---------+
df2 =
+------------+
| list_id|
+------------+
| [10, 3, 11]|
|[12, 13, 14]|
| [15, 6, 16]|
+------------+
期望的结果是:
df2 =
+-------------------+
| list_id|
+-------------------+
| [1, 2, 3, 10, 11] |
| [4, 5, 6, 15, 16] |
| [7, 8, 9] |
| [12, 13, 14] |
+-------------------+
我的目标是连接交集不为空的列表,并使其他列表与 pyspark 保持原样。
注意:我的数据帧非常大,无法使用 Spark Sql 连接。
我想出了一个无需任何连接操作即可运行的代码。 它在某种程度上非常混乱,考虑到我多次分解数组,我不知道它将如何表现内存。
import pyspark.sql.functions as F
from pyspark.sql.window import Window
df1 = (sc.parallelize([(1, 2, 3), (4, 5, 6), (7, 8, 9)])
.toDF(('c1', 'c2', 'c3'))
.select(F.array(F.col('c1'), F.col('c2'), F.col('c3')).alias('id_list'))
)
df2 = (sc.parallelize([(10, 3, 11), (12, 13, 14), (15, 6, 16)])
.toDF(('c1', 'c2', 'c3'))
.select(F.array(F.col('c1'), F.col('c2'), F.col('c3')).alias('id_list'))
)
out = (df1.union(df2)
.withColumn('key1', F.explode('id_list'))
.withColumn('key2', F.explode('id_list'))
.groupBy('key1')
.agg(F.sort_array(F.collect_set(F.col('key2'))).alias('id_list'))
.withColumn('key1', F.explode('id_list'))
.withColumn('max_length', F.max(F.size('id_list')).over(Window().partitionBy('key1')))
.where(F.col('max_length')==F.size('id_list'))
.select('id_list')
.distinct()
)