Spark 作业折叠成一个分区,但我不明白为什么
Spark job collapses into a single partition but I do not understand why
我正在尝试调整 Spark 作业。
我正在使用数据块 运行 它并且在某个时候我看到了这张图片:
请注意,在第 12 阶段,我只有一个分区,这意味着没有并行性。我怎样才能推断出造成这种情况的原因?可以肯定的是,我的代码中没有任何 'repartition(1)'。
添加(稍微混淆的)代码:
spark.read(cid, location).createOrReplaceTempView("some_parquets")
parquets = spark.profile_paqrquet_df(cid)
parquets.where("year = 2018 and month = 5 and day = 18 and sm_device_source = 'js'"
.createOrReplaceTempView("parquets")
# join between two dataframes.
spark.sql(
"""
SELECT {fields}
FROM some_parquets
WHERE some_parquets.a = 'js'
AND some_parquets.b = 'normal'
AND date_f >= to_date('2018-05-01')
AND date_f < to_date('2018-05-05')
limit {limit}
""".format(limit=1000000, fields=",".join(fields))
).createOrReplaceTempView("some_parquets")
join_result = spark.sql(
"""
SELECT
parquets.some_field,
struct(some_parquets.*) as some_parquets
FROM some_parquets
LEFT ANTI JOIN some_ids ON some_parquets.sid = some_ids.sid
LEFT OUTER JOIN parquets ON some_parquets.uid = parquets.uid
""".format(some_ids=some_ids)
)
# turn items in each partition into vectors for machine learning
vectors = join_result \
.rdd \
.mapPartitions(extract)
# write vectors to file system. This evaluates the results
dump_vectors(vectors, output_folder)
会话构造:
spark = SparkSession \
.builder \
.appName("...") \
.config("spark.sql.shuffle.partitions", 1000)
如果有人仍然对答案感兴趣,简而言之,这是因为限制条款。奇怪的是,limit 子句在 shuffle 阶段后将数据折叠到一个分区中。
我本地 spark-shell
上的示例 运行
scala> spark.sql("Select * from temp limit 1").rdd.partitions.size
res28: Int = 1
scala> spark.sql("Select * from temp").rdd.partitions.size
res29: Int = 16
我正在尝试调整 Spark 作业。
我正在使用数据块 运行 它并且在某个时候我看到了这张图片:
请注意,在第 12 阶段,我只有一个分区,这意味着没有并行性。我怎样才能推断出造成这种情况的原因?可以肯定的是,我的代码中没有任何 'repartition(1)'。
添加(稍微混淆的)代码:
spark.read(cid, location).createOrReplaceTempView("some_parquets")
parquets = spark.profile_paqrquet_df(cid)
parquets.where("year = 2018 and month = 5 and day = 18 and sm_device_source = 'js'"
.createOrReplaceTempView("parquets")
# join between two dataframes.
spark.sql(
"""
SELECT {fields}
FROM some_parquets
WHERE some_parquets.a = 'js'
AND some_parquets.b = 'normal'
AND date_f >= to_date('2018-05-01')
AND date_f < to_date('2018-05-05')
limit {limit}
""".format(limit=1000000, fields=",".join(fields))
).createOrReplaceTempView("some_parquets")
join_result = spark.sql(
"""
SELECT
parquets.some_field,
struct(some_parquets.*) as some_parquets
FROM some_parquets
LEFT ANTI JOIN some_ids ON some_parquets.sid = some_ids.sid
LEFT OUTER JOIN parquets ON some_parquets.uid = parquets.uid
""".format(some_ids=some_ids)
)
# turn items in each partition into vectors for machine learning
vectors = join_result \
.rdd \
.mapPartitions(extract)
# write vectors to file system. This evaluates the results
dump_vectors(vectors, output_folder)
会话构造:
spark = SparkSession \
.builder \
.appName("...") \
.config("spark.sql.shuffle.partitions", 1000)
如果有人仍然对答案感兴趣,简而言之,这是因为限制条款。奇怪的是,limit 子句在 shuffle 阶段后将数据折叠到一个分区中。
我本地 spark-shell
上的示例 运行scala> spark.sql("Select * from temp limit 1").rdd.partitions.size
res28: Int = 1
scala> spark.sql("Select * from temp").rdd.partitions.size
res29: Int = 16