用于 Exchange 分区的 Spark 物理计划 false/true

Spark Physical Plan false/true for Exchange partitioning

repartitionedDF.explain

显示此物理计划

== Physical Plan ==
Exchange hashpartitioning(purchase_month#25, 10), false, [id=#6]
+- LocalTableScan [item#23, price#24, purchase_month#25]

我注意到在某些情况下 false 也可能是 true。

这是什么意思?以前知道,忘记了。

经过一些挖掘,我相信它指的是 noUserSpecifiedNumPartition 变量。如果您进行重新分区,则此布尔变量将为 false,因为您指定了分区数。否则就是 true。尝试做一个简单的 orderBy,我认为你应该得到 true.

我通过

发现了这个
println(df.repartition('series).orderBy('series).queryExecution.executedPlan.prettyJson)

灵感来自 。它给出了输出(仅截断为相关部分):

{
  "class" : "org.apache.spark.sql.execution.exchange.ShuffleExchangeExec",
  "num-children" : 1,
  "outputPartitioning" : [ {
    "class" : "org.apache.spark.sql.catalyst.plans.physical.RangePartitioning",
    "num-children" : 1,
    "ordering" : [ 0 ],
    "numPartitions" : 200
  }, {
    "class" : "org.apache.spark.sql.catalyst.expressions.SortOrder",
    "num-children" : 1,
    "child" : 0,
    "direction" : {
      "object" : "org.apache.spark.sql.catalyst.expressions.Ascending$"
    },
    "nullOrdering" : {
      "object" : "org.apache.spark.sql.catalyst.expressions.NullsFirst$"
    },
    "sameOrderExpressions" : {
      "object" : "scala.collection.immutable.Set$EmptySet$"
    }
  }, {
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
    "num-children" : 0,
    "name" : "series",
    "dataType" : "string",
    "nullable" : true,
    "metadata" : { },
    "exprId" : {
      "product-class" : "org.apache.spark.sql.catalyst.expressions.ExprId",
      "id" : 16,
      "jvmId" : "35ee1aa5-f899-4fca-a8a6-a06c3eaabe5c"
    },
    "qualifier" : [ ]
  } ],
  "child" : 0,
  "noUserSpecifiedNumPartition" : true
}, {
  "class" : "org.apache.spark.sql.execution.exchange.ShuffleExchangeExec",
  "num-children" : 1,
  "outputPartitioning" : [ {
    "class" : "org.apache.spark.sql.catalyst.plans.physical.HashPartitioning",
    "num-children" : 1,
    "expressions" : [ 0 ],
    "numPartitions" : 200
  }, {
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
    "num-children" : 0,
    "name" : "series",
    "dataType" : "string",
    "nullable" : true,
    "metadata" : { },
    "exprId" : {
      "product-class" : "org.apache.spark.sql.catalyst.expressions.ExprId",
      "id" : 16,
      "jvmId" : "35ee1aa5-f899-4fca-a8a6-a06c3eaabe5c"
    },
    "qualifier" : [ ]
  } ],
  "child" : 0,
  "noUserSpecifiedNumPartition" : false
}

其中 truefalse 与物理计划很好地对应:

df.repartition('series).orderBy('series).explain
== Physical Plan ==
*(1) Sort [series#16 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(series#16 ASC NULLS FIRST, 200), true, [id=#192]
   +- Exchange hashpartitioning(series#16, 200), false, [id=#190]
      +- FileScan csv [series#16,timestamp#17,value#18] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/tmp/df.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<series:string,timestamp:string,value:string>