PySpark - 使用多个连接列时 CPU 重笛卡尔连接问题
PySpark - Issue with CPU heavy cartesian join when using multiple join columns
背景/场景:
我有两个 table:一个 1-2 百万条目 table 具有 交易 形式
TRX-ID , PROCESS-ID , ACTOR-ID
另外一个 participant-lookup(系统的多个用户类别之一)table 形式
USER-ID , PARTICIPANT-ID
事务table由于历史原因有点乱。 PROCESS-ID 可以是参与者 ID,ACTOR-ID 可以是不同类型用户的用户 ID。在某些情况下,PROCESS-ID 是其他东西,而 ACTOR-ID 是参与者的用户 ID。
我需要加入 交易 和 participant-lookup table 以获得 participant-id所有交易。我尝试了两种方法。
(我在代码片段中省略了一些代码步骤,并专注于连接部分。假设 df 变量是数据框,我 select 右列支持联合。)
第一种方法:
transactions_df.join(pt_lookup_df, (transactions_df['actor-id'] == pt_lookup_df['user-id']) | (transactions_df['process-id'] == pt_lookup_df['participant-id']))
这个连接的代码非常慢。它最终在我的工作中 运行 在 10 个实例 AWS glue 集群上花费了 45 分钟,所有执行者的负载接近 99%。
第二种方法:
我意识到有些交易已经有了participant-id,我不需要为他们加入。所以我改为:
transactions_df_1.join(pt_lookup_df, (transactions_df_1['actor-id'] == pt_lookup_df['user-id']))
transactions_df_2 = transactions_df_2.withColumnRenamed('process-id', 'participant-id')
transactions_df_1.union(transactions_df_2)
5 分钟内完成!
两种方法都给出了正确的结果。
问题
我不明白为什么一个这么慢另一个不慢。第二种方法中排除的数据量很小。所以 transactions_df_2 只占总数据的一小部分。
查看计划,影响主要围绕在方法 1 而非方法 2 中完成的笛卡尔积。所以我认为,这是性能瓶颈。我仍然不明白这怎么会导致计算时间相差 40 分钟。
谁能解释一下?
DAG 中的笛卡尔积通常是 Spark 中的警告标志吗?
总结
条件中包含多个列的连接似乎触发了极慢的笛卡尔积运算。我是否应该对较小的数据集进行广播操作来避免这种情况?
DAG 方法 1
DAG 方法 2
这是因为 Cartesian Product join
和常规 join
不涉及相同的数据混洗过程。即使数据量相似,洗牌量也不同。
这 article 解释了额外洗牌的来源。
背景/场景:
我有两个 table:一个 1-2 百万条目 table 具有 交易 形式
TRX-ID , PROCESS-ID , ACTOR-ID
另外一个 participant-lookup(系统的多个用户类别之一)table 形式
USER-ID , PARTICIPANT-ID
事务table由于历史原因有点乱。 PROCESS-ID 可以是参与者 ID,ACTOR-ID 可以是不同类型用户的用户 ID。在某些情况下,PROCESS-ID 是其他东西,而 ACTOR-ID 是参与者的用户 ID。
我需要加入 交易 和 participant-lookup table 以获得 participant-id所有交易。我尝试了两种方法。
(我在代码片段中省略了一些代码步骤,并专注于连接部分。假设 df 变量是数据框,我 select 右列支持联合。)
第一种方法:
transactions_df.join(pt_lookup_df, (transactions_df['actor-id'] == pt_lookup_df['user-id']) | (transactions_df['process-id'] == pt_lookup_df['participant-id']))
这个连接的代码非常慢。它最终在我的工作中 运行 在 10 个实例 AWS glue 集群上花费了 45 分钟,所有执行者的负载接近 99%。
第二种方法:
我意识到有些交易已经有了participant-id,我不需要为他们加入。所以我改为:
transactions_df_1.join(pt_lookup_df, (transactions_df_1['actor-id'] == pt_lookup_df['user-id']))
transactions_df_2 = transactions_df_2.withColumnRenamed('process-id', 'participant-id')
transactions_df_1.union(transactions_df_2)
5 分钟内完成!
两种方法都给出了正确的结果。
问题
我不明白为什么一个这么慢另一个不慢。第二种方法中排除的数据量很小。所以 transactions_df_2 只占总数据的一小部分。
查看计划,影响主要围绕在方法 1 而非方法 2 中完成的笛卡尔积。所以我认为,这是性能瓶颈。我仍然不明白这怎么会导致计算时间相差 40 分钟。
谁能解释一下?
DAG 中的笛卡尔积通常是 Spark 中的警告标志吗?
总结
条件中包含多个列的连接似乎触发了极慢的笛卡尔积运算。我是否应该对较小的数据集进行广播操作来避免这种情况?
DAG 方法 1
DAG 方法 2
这是因为 Cartesian Product join
和常规 join
不涉及相同的数据混洗过程。即使数据量相似,洗牌量也不同。
这 article 解释了额外洗牌的来源。