为什么 Spark DataFrame 创建了错误数量的分区?
Why Spark DataFrame is creating wrong number of partitions?
我有一个包含 2 列的 spark 数据框 - col1
和 col2
。
scala> val df = List((1, "a")).toDF("col1", "col2")
df: org.apache.spark.sql.DataFrame = [col1: int, col2: string]
当我以 parquet
格式在磁盘上写入 df
时,为了将所有数据写入等于 col1
中唯一值数量的文件,我做了一个 repartition
使用 col1
,像这样:
scala> df.repartition(col("col1")).write.partitionBy("col1").parquet("file")
以上代码在文件系统中只生成一个文件。但是,shuffle的次数变成了200次。
我在这里无法理解一件事,如果 col1
仅包含一个值,即 1
那么为什么要在 repartition
中创建 200 个分区?
在Spark SQL shuffle world中,默认的shuffle partition数量为200,由spark.sql.shuffle.partitions
控制
repartition(columnName)
默认创建 200 个分区(更具体地说,spark.sql.shuffle.partitions
分区),无论 col1
有多少个唯一值。如果 col1
只有 1 个唯一值,则 199 个分区是空的。另一方面,如果 col1
的唯一值超过 200 个,每个分区将有多个 col1
值。
如果您只想要 1 个分区,那么您可以 repartition(1,col("col1"))
或 coalesce(1)
。但并不是说 coalesce
的行为不一样,因为 coalesce
我在你的代码中被进一步向上移动,你可能会失去并行性(参见 )
如果你想查看你分区的内容,我已经为这个做了2个方法:
// calculates record count per partition
def inspectPartitions(df: DataFrame) = {
import df.sqlContext.implicits._
df.rdd.mapPartitions(partIt => {
Iterator(partIt.toSeq.size)
}
).toDF("record_count")
}
// inspects how a given key is distributed accross the partition of a dataframe
def inspectPartitions(df: DataFrame, key: String) = {
import df.sqlContext.implicits._
df.rdd.mapPartitions(partIt => {
val part = partIt.toSet
val partSize = part.size
val partKeys = part.map(r => r.getAs[Any](key).toString.trim)
val partKeyStr = partKeys.mkString(", ")
val partKeyCount = partKeys.size
Iterator((partKeys.toArray,partSize))
}
).toDF("partitions","record_count")
}
现在您可以像这样检查你的数据框:
inspectPartitions(df.repartition(col("col1"),"col1")
.where($"record_count">0)
.show
我有一个包含 2 列的 spark 数据框 - col1
和 col2
。
scala> val df = List((1, "a")).toDF("col1", "col2")
df: org.apache.spark.sql.DataFrame = [col1: int, col2: string]
当我以 parquet
格式在磁盘上写入 df
时,为了将所有数据写入等于 col1
中唯一值数量的文件,我做了一个 repartition
使用 col1
,像这样:
scala> df.repartition(col("col1")).write.partitionBy("col1").parquet("file")
以上代码在文件系统中只生成一个文件。但是,shuffle的次数变成了200次。
我在这里无法理解一件事,如果 col1
仅包含一个值,即 1
那么为什么要在 repartition
中创建 200 个分区?
在Spark SQL shuffle world中,默认的shuffle partition数量为200,由spark.sql.shuffle.partitions
repartition(columnName)
默认创建 200 个分区(更具体地说,spark.sql.shuffle.partitions
分区),无论 col1
有多少个唯一值。如果 col1
只有 1 个唯一值,则 199 个分区是空的。另一方面,如果 col1
的唯一值超过 200 个,每个分区将有多个 col1
值。
如果您只想要 1 个分区,那么您可以 repartition(1,col("col1"))
或 coalesce(1)
。但并不是说 coalesce
的行为不一样,因为 coalesce
我在你的代码中被进一步向上移动,你可能会失去并行性(参见
如果你想查看你分区的内容,我已经为这个做了2个方法:
// calculates record count per partition
def inspectPartitions(df: DataFrame) = {
import df.sqlContext.implicits._
df.rdd.mapPartitions(partIt => {
Iterator(partIt.toSeq.size)
}
).toDF("record_count")
}
// inspects how a given key is distributed accross the partition of a dataframe
def inspectPartitions(df: DataFrame, key: String) = {
import df.sqlContext.implicits._
df.rdd.mapPartitions(partIt => {
val part = partIt.toSet
val partSize = part.size
val partKeys = part.map(r => r.getAs[Any](key).toString.trim)
val partKeyStr = partKeys.mkString(", ")
val partKeyCount = partKeys.size
Iterator((partKeys.toArray,partSize))
}
).toDF("partitions","record_count")
}
现在您可以像这样检查你的数据框:
inspectPartitions(df.repartition(col("col1"),"col1")
.where($"record_count">0)
.show