Spark 作业折叠成一个分区,但我不明白为什么

Spark job collapses into a single partition but I do not understand why

我正在尝试调整 Spark 作业。

我正在使用数据块 运行 它并且在某个时候我看到了这张图片:

请注意,在第 12 阶段,我只有一个分区,这意味着没有并行性。我怎样才能推断出造成这种情况的原因?可以肯定的是,我的代码中没有任何 'repartition(1)'。

添加(稍微混淆的)代码:

spark.read(cid, location).createOrReplaceTempView("some_parquets")

parquets = spark.profile_paqrquet_df(cid)

parquets.where("year = 2018 and month = 5 and day = 18 and sm_device_source = 'js'"
        .createOrReplaceTempView("parquets")

# join between two dataframes.  

spark.sql(
    """
        SELECT     {fields}
        FROM       some_parquets  
        WHERE      some_parquets.a = 'js' 
        AND        some_parquets.b = 'normal' 
        AND        date_f >= to_date('2018-05-01') 
        AND date_f < to_date('2018-05-05') 
        limit {limit}
    """.format(limit=1000000, fields=",".join(fields))
).createOrReplaceTempView("some_parquets")

join_result = spark.sql(
    """
        SELECT 
                   parquets.some_field, 
                   struct(some_parquets.*) as some_parquets
        FROM       some_parquets  
        LEFT ANTI JOIN some_ids ON some_parquets.sid = some_ids.sid 
        LEFT OUTER JOIN parquets ON some_parquets.uid = parquets.uid   
    """.format(some_ids=some_ids)
)

# turn items in each partition into vectors for machine learning
vectors = join_result \
    .rdd \
    .mapPartitions(extract)

# write vectors to file system. This evaluates the results
dump_vectors(vectors, output_folder) 

会话构造:

spark = SparkSession \
        .builder \
        .appName("...") \
        .config("spark.sql.shuffle.partitions", 1000)

如果有人仍然对答案感兴趣,简而言之,这是因为限制条款。奇怪的是,limit 子句在 shuffle 阶段后将数据​​折叠到一个分区中。

我本地 spark-shell

上的示例 运行
scala> spark.sql("Select * from temp limit 1").rdd.partitions.size
res28: Int = 1

scala> spark.sql("Select * from temp").rdd.partitions.size
res29: Int = 16