Spark sql 查询导致分区计数 inflation

Spark sql query causes partition count inflation

Spark (1.5.2) 中的分区计数在某些 sql 查询中爆炸。

可以避免吗?



在我的例子中,我有三个表(文本、所有者、人员),我在这些表上执行以下查询:

sqlContext.sql(
                "SELECT t.* FROM texts t JOIN ("+
                        "SELECT o.TextId FROM "+
                        "owners o JOIN ("+
                        "SELECT UserId FROM person WHERE LOWER(name) "+
                        "RLIKE '.*"+escapeRegex(filter.name)+"\s*$'"+
                        ") p ON (o.UserId=p.UserId) GROUP BY o.TextId"+
                        ") o "+
                        "ON (t.TextId = o.TextId)")

查询前分区数为2,查询后为200textsDF.javaRDD().partitions().size()

Join/Group 的分区数或任何具有混洗的操作,取决于 属性 "spark.sql.shuffle.partitions"。这必须已在您的集群配置中设置为 200。

这个属性的重要性:这决定了对数据进行reducer(有点理解)操作的数量。通过将此 属性 设置得更高,您可以确保有足够的并行度。

无论如何,您可以根据需要更改 属性。 SparkConf 可以如下设置,任意数字。

conf.set("spark.sql.shuffle.partitions","2");

注意:将其设置得较低会降低性能,从而增加网络使用量并降低并行度。

另一方面,文件读取的并行度取决于默认并行度 属性,它表示每个核心的任务数/hdfs 数据中的块数。但是对于任何有shuffle的操作,它依赖于我提到的属性。