PySpark - 使用多个连接列时 CPU 重笛卡尔连接问题

PySpark - Issue with CPU heavy cartesian join when using multiple join columns

背景/场景:

我有两个 table:一个 1-2 百万条目 table 具有 交易 形式

TRX-ID , PROCESS-ID , ACTOR-ID

另外一个 participant-lookup(系统的多个用户类别之一)table 形式

USER-ID , PARTICIPANT-ID

事务table由于历史原因有点乱。 PROCESS-ID 可以是参与者 ID,ACTOR-ID 可以是不同类型用户的用户 ID。在某些情况下,PROCESS-ID 是其他东西,而 ACTOR-ID 是参与者的用户 ID。

我需要加入 交易participant-lookup table 以获得 participant-id所有交易。我尝试了两种方法。

(我在代码片段中省略了一些代码步骤,并专注于连接部分。假设 df 变量是数据框,我 select 右列支持联合。)

第一种方法:

transactions_df.join(pt_lookup_df, (transactions_df['actor-id'] == pt_lookup_df['user-id']) | (transactions_df['process-id'] == pt_lookup_df['participant-id']))

这个连接的代码非常慢。它最终在我的工作中 运行 在 10 个实例 AWS glue 集群上花费了 45 分钟,所有执行者的负载接近 99%。

第二种方法:

我意识到有些交易已经有了participant-id,我不需要为他们加入。所以我改为:

transactions_df_1.join(pt_lookup_df, (transactions_df_1['actor-id'] == pt_lookup_df['user-id']))
transactions_df_2 = transactions_df_2.withColumnRenamed('process-id', 'participant-id')
transactions_df_1.union(transactions_df_2)

5 分钟内完成!

两种方法都给出了正确的结果。

问题

我不明白为什么一个这么慢另一个不慢。第二种方法中排除的数据量很小。所以 transactions_df_2 只占总数据的一小部分。

查看计划,影响主要围绕在方法 1 而非方法 2 中完成的笛卡尔积。所以我认为,这是性能瓶颈。我仍然不明白这怎么会导致计算时间相差 40 分钟。

谁能解释一下?

DAG 中的笛卡尔积通常是 Spark 中的警告标志吗?

总结

条件中包含多个列的连接似乎触发了极慢的笛卡尔积运算。我是否应该对较小的数据集进行广播操作来避免这种情况?

DAG 方法 1

DAG 方法 2

这是因为 Cartesian Product join 和常规 join 不涉及相同的数据混洗过程。即使数据量相似,洗牌量也不同。

article 解释了额外洗牌的来源。