Spark 物理计划中的重新分区和洗牌

Repartition and Shuffles in Spark Physical Plans

我正在尝试了解 Spark (2.4) 物理计划。我们通过 SQL API.

与 Spark 交互

我正在使用以下 sql。 sql 在步骤 1 中有一个聚合,在下一步中有一个连接操作。我的意图是在第 1 步之前 repartition 来源 table 以便我可以重新使用此 Exchange 并避免以下 Shuffles (Exchanges)步骤,但它没有按照我的预期工作。你能帮我理解我错在哪里吗?

    create or replace temporary view prsn_dtl as
    select
    policy_num,
    prsn_id,
    eff_dt,
    from db.person_details
    cluster by policy_num;
    
    create or replace temporary view plcy_dtl as
    select
    policy_num,
    role_desc,
    prsn_actv_flg
    from plcy_detail
    
    create or replace temporary view my_keys as
    select
    policy_num as policy_num,
    prsn_id as prsn_id,
    max(eff_dt) as eff_dt
    from prsn_dtl
    group by 1, 2;
    
    select
    keys.policy_num,
    keys.prsn_id,
    keys.eff_dt,
    plcy.role_desc,
    plcy.prsn_actv_flg
    from my_keys keys
    inner join plcy_dtl plcy
    on keys.policy_num = plcy.policy_num;

在 DAG 表示中我找到了 3 Exchanges -
第 1 步)第一个 hashpartitioning(policy_num#92, 200) 由于 cluster byaggregate
之前 第 2 步)第二步是在 hashpartitioning(policy_num#163, prsn_id#164, 200)
上的 Aggregate 运算符之间 步骤 3) 最后 hashpartitioning(policy_num#163) 在 sort-merge Join

之前

我的问题:
为什么上面第 1 步中的 Exchange(来自 cluster by)没有向下传播并且在排序合并连接之前的第 3 步中没有被重新使用。

我的期望是 Spark 将重用步骤 1 (cluster by) 中的 Exchange,并且不会在步骤 3 中添加另一个 Exchange(在 SMJ 之前),因为两者都是repartitioned 在 policy_num。

任何人都可以解释我哪里出错了。

更新: 物理计划:-

CollectLimit 1
        +- *(6) Project [policy_num#836, prsn_id#837, eff_dt#838, role_desc#304, prsn_actv_flg#306]
          +- *(6) SortMergeJoin [policy_num#836], [policy_num#300], Inner 
    :       +- *(3) Sort [policy_num#836 ASC NULLS FIRST], false, 0
    :        +- Exchange hashpartitioning(policy_num#836, 200)
    :         +- *(2) HashAggregate(keys=policy_num#801, prsn_id#802], functions=[max(eff_dt)], output=[policy_num#836, prsn_id#837, eff_dt#838])
    :           +- *(2) HashAggregate(keys=[policy_num#801, prsn_id#802], functions=[partial_max(eff_dt#803)], output=[policy#801, prsn_id#802, max#847]) 
    :             +- *(2) Sort [policy_num#801 ASC NULLS FIRST], false, 0 
    :               +- Exchange hashpartitioning(policy_num#801, 200)
    :                +- *(1) Project [policy_num#801, prsn_id#802, eff_dt#803]
    :                  +- *(1) Filter isnotnull(policy_num#801)     
    :                   +-   *(1) Filescan parquet testdb.prsn_details[policy_num#801,prsn_id#802,eff_dt#803] Batched: true, Format: Parquet, Location: InMemoryFileIndex[s3://test_db.prsn_details/20200505..., PartitionFilters: [], PushFilters: [IsNotNull(policy_num)], ReadSchema: struct<policy_num:string, prsn_id:string, eff_dt:date>              
               +- *(5) Sort [policy_num#300 ASC NULLS FIRST], false, 0  
                +- Exchange hashpartitioning(policy_num#300, 200)
                  +- *(4) Project [policy_num#300, role_desc#304, prsn_actv_flg#306]
                   +- *(4) Filter (((trim(prsn_actv_flg#306, None) = ) ................
                     +- *(4) Filescan parquet
plcy_detail[policy_num#300,role_desc#304,prsn_actv_flg#306] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[s3://test_db.plcy_details/20200505..., PartitionFilters: [], PushFilters: [IsNotNull(policy_num)], ReadSchema: struct<policy_num:string, role_desc:string, prsn_actv_flg:string>                     

更新 II:(解决方案):- 从 GROUP BY 查询中删除了列重命名 (AS),现在正在重新交换在查询计划中使用:

create or replace temporary view my_keys as
select
policy_num,
prsn_id,
max(eff_dt) as eff_dt
from prsn_dtl
group by 1, 2;

谢谢

由于您按 policy_num 和 prsn_id 分组,因此您需要重新洗牌,然后再次洗牌才能加入。在这种情况下,初始集群 by 是不必要的。您必须至少洗牌 2 次:1) 在 my_keys 中分组,2) 加入。

您指定的查询是否正是您在此处提到的查询?或者它是更大查询的一部分?如果 aggregate 键是 re-partitioned 键的子集,Spark 将 re-use exchange。问题可能是因为列在您的查询之间被重命名。如果您的查询有别名,您可能需要删除它们,然后再次检查。