为什么 Spark crossJoin 对于一个小数据框需要这么长时间?
Why does Spark crossJoin take so long for a tiny dataframe?
我正在尝试对两个数据帧执行以下交叉连接,每个数据帧有 5 行,但 Spark 在我的机器上生成了 40000 个任务,完成任务需要 30 秒。知道为什么会这样吗?
df = spark.createDataFrame([['1','1'],['2','2'],['3','3'],['4','4'],['5','5']]).toDF('a','b')
df = df.repartition(1)
df.select('a').distinct().crossJoin(df.select('b').distinct()).count()
两件事 - 看起来您无法直接控制创建 DF 的分区数,因此我们可以先创建一个 RDD(您可以在其中指定分区数)并将其转换为 DF。您也可以将随机分区设置为“1”。这些都确保您在整个执行过程中只有 1 个分区,并且应该加快速度。
请注意,对于设计 Spark 的较大数据集来说,这根本不是问题(在这种大小的数据集上实现相同的结果会更快,而不使用 spark)。所以在一般情况下你真的不需要做这样的事情,但是调整分区的数量到你的 resources/data.
spark.conf.set("spark.default.parallelism", "1")
spark.conf.set("spark.sql.shuffle.partitions", "1")
df = sc.parallelize([['1','1'],['2','2'],['3','3'],['4','4'],['5','5']], 1).toDF(['a','b'])
df.select('a').distinct().crossJoin(df.select('b').distinct()).count()
spark.conf.set
仅为单次执行设置配置,如果您想要更多永久更改,请在实际的 spark conf 文件中进行设置
你在join之前调用了一个.distinct
,它需要一个shuffle,所以它会根据spark.sql.shuffle.partitions 属性值重新分区数据。因此,df.select('a').distinct()
和 df.select('b').distinct()
产生了新的 DataFrame,每个都有 200 个分区,200 x 200 = 40000
我正在尝试对两个数据帧执行以下交叉连接,每个数据帧有 5 行,但 Spark 在我的机器上生成了 40000 个任务,完成任务需要 30 秒。知道为什么会这样吗?
df = spark.createDataFrame([['1','1'],['2','2'],['3','3'],['4','4'],['5','5']]).toDF('a','b')
df = df.repartition(1)
df.select('a').distinct().crossJoin(df.select('b').distinct()).count()
两件事 - 看起来您无法直接控制创建 DF 的分区数,因此我们可以先创建一个 RDD(您可以在其中指定分区数)并将其转换为 DF。您也可以将随机分区设置为“1”。这些都确保您在整个执行过程中只有 1 个分区,并且应该加快速度。
请注意,对于设计 Spark 的较大数据集来说,这根本不是问题(在这种大小的数据集上实现相同的结果会更快,而不使用 spark)。所以在一般情况下你真的不需要做这样的事情,但是调整分区的数量到你的 resources/data.
spark.conf.set("spark.default.parallelism", "1")
spark.conf.set("spark.sql.shuffle.partitions", "1")
df = sc.parallelize([['1','1'],['2','2'],['3','3'],['4','4'],['5','5']], 1).toDF(['a','b'])
df.select('a').distinct().crossJoin(df.select('b').distinct()).count()
spark.conf.set
仅为单次执行设置配置,如果您想要更多永久更改,请在实际的 spark conf 文件中进行设置
你在join之前调用了一个.distinct
,它需要一个shuffle,所以它会根据spark.sql.shuffle.partitions 属性值重新分区数据。因此,df.select('a').distinct()
和 df.select('b').distinct()
产生了新的 DataFrame,每个都有 200 个分区,200 x 200 = 40000