Spark 显示基于成本的优化器统计信息
Spark show cost based optimizer statistics
我尝试通过在 spark-shell 中设置 属性 来启用 Spark cbo
spark.conf.set("spark.sql.cbo.enabled", true)
我现在运行spark.sql("ANALYZE TABLE events COMPUTE STATISTICS").show
运行 此查询没有显示任何统计信息 spark.sql("select * from events where eventID=1").explain(true)
运行 这在 Spark 2.2.1
scala> spark.sql("select * from events where eventID=1").explain()
== Physical Plan ==
*Project [buyDetails.capacity#923, buyDetails.clearingNumber#924, buyDetails.leavesQty#925L, buyDetails.liquidityCode#926, buyDetails.orderID#927, buyDetails.side#928, cancelQty#929L, capacity#930, clearingNumber#931, contraClearingNumber#932, desiredLeavesQty#933L, displayPrice#934, displayQty#935L, eventID#936, eventTimestamp#937L, exchange#938, executionCodes#939, fillID#940, handlingInstructions#941, initiator#942, leavesQty#943L, nbbPrice#944, nbbQty#945L, nboPrice#946, ... 29 more fields]
+- *Filter (isnotnull(eventID#936) && (cast(eventID#936 as int) = 1))
+- *FileScan parquet default.events[buyDetails.capacity#923,buyDetails.clearingNumber#924,buyDetails.leavesQty#925L,buyDetails.liquidityCode#926,buyDetails.orderID#927,buyDetails.side#928,cancelQty#929L,capacity#930,clearingNumber#931,contraClearingNumber#932,desiredLeavesQty#933L,displayPrice#934,displayQty#935L,eventID#936,eventTimestamp#937L,exchange#938,executionCodes#939,fillID#940,handlingInstructions#941,initiator#942,leavesQty#943L,nbbPrice#944,nbbQty#945L,nboPrice#946,... 29 more fields] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/asehgal/data/events], PartitionFilters: [], PushedFilters: [IsNotNull(eventID)], ReadSchema: struct<buyDetails.capacity:string,buyDetails.clearingNumber:string,buyDetails.leavesQty:bigint,bu...
对我来说,统计数据在 df.explain(true)
中不可见。我玩了一下,可以使用 println(df.queryExecution.stringWithStats)
打印统计信息,完整示例:
val ss = SparkSession
.builder()
.master("local[*]")
.appName("TestCBO")
.config("spark.sql.cbo.enabled",true)
.getOrCreate()
import ss.implicits._
val df1 = ss.range(10000L).toDF("i")
df1.write.mode("overwrite").saveAsTable("table1")
val df2 = ss.range(100000L).toDF("i")
df2.write.mode("overwrite").saveAsTable("table2")
ss.sql("ANALYZE TABLE table1 COMPUTE STATISTICS FOR COLUMNS i")
ss.sql("ANALYZE TABLE table2 COMPUTE STATISTICS FOR COLUMNS i")
val df = ss.table("table1").join(ss.table("table2"), "i")
.where($"i" > 1000)
println(df.queryExecution.stringWithStats)
给予
== Optimized Logical Plan ==
Project [i#2554L], Statistics(sizeInBytes=147.2 KB, rowCount=9.42E+3, hints=none)
+- Join Inner, (i#2554L = i#2557L), Statistics(sizeInBytes=220.8 KB, rowCount=9.42E+3, hints=none)
:- Filter (isnotnull(i#2554L) && (i#2554L > 1000)), Statistics(sizeInBytes=140.6 KB, rowCount=9.00E+3, hints=none)
: +- Relation[i#2554L] parquet, Statistics(sizeInBytes=156.3 KB, rowCount=1.00E+4, hints=none)
+- Filter ((i#2557L > 1000) && isnotnull(i#2557L)), Statistics(sizeInBytes=1546.9 KB, rowCount=9.90E+4, hints=none)
+- Relation[i#2557L] parquet, Statistics(sizeInBytes=1562.5 KB, rowCount=1.00E+5, hints=none)
这在标准 df.explain
中没有显示,因为这会触发 (Dataset.scala):
ExplainCommand(queryExecution.logical, extended = true) // cost = false in this constructor
要启用成本输出,我们可以自己调用此 ExplainCommand
:
import org.apache.spark.sql.execution.command.ExplainCommand
val explain = ExplainCommand(df.queryExecution.logical, extended = true, cost = true)
ss.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
r => println(r.getString(0))
}
在这里你也可以启用生成代码的输出(设置codegen = true
)
或者,这会给出类似的输出
df // join of two dataframes and filter
.registerTempTable("tmp")
ss.sql("EXPLAIN COST select * from tmp").show(false)
要在 SparkUI 中查看统计信息,您必须转到 SQL-tab,然后 select 相应的查询(在本例中为 df.show()
):
我尝试通过在 spark-shell 中设置 属性 来启用 Spark cbo
spark.conf.set("spark.sql.cbo.enabled", true)
我现在运行spark.sql("ANALYZE TABLE events COMPUTE STATISTICS").show
运行 此查询没有显示任何统计信息 spark.sql("select * from events where eventID=1").explain(true)
运行 这在 Spark 2.2.1
scala> spark.sql("select * from events where eventID=1").explain()
== Physical Plan ==
*Project [buyDetails.capacity#923, buyDetails.clearingNumber#924, buyDetails.leavesQty#925L, buyDetails.liquidityCode#926, buyDetails.orderID#927, buyDetails.side#928, cancelQty#929L, capacity#930, clearingNumber#931, contraClearingNumber#932, desiredLeavesQty#933L, displayPrice#934, displayQty#935L, eventID#936, eventTimestamp#937L, exchange#938, executionCodes#939, fillID#940, handlingInstructions#941, initiator#942, leavesQty#943L, nbbPrice#944, nbbQty#945L, nboPrice#946, ... 29 more fields]
+- *Filter (isnotnull(eventID#936) && (cast(eventID#936 as int) = 1))
+- *FileScan parquet default.events[buyDetails.capacity#923,buyDetails.clearingNumber#924,buyDetails.leavesQty#925L,buyDetails.liquidityCode#926,buyDetails.orderID#927,buyDetails.side#928,cancelQty#929L,capacity#930,clearingNumber#931,contraClearingNumber#932,desiredLeavesQty#933L,displayPrice#934,displayQty#935L,eventID#936,eventTimestamp#937L,exchange#938,executionCodes#939,fillID#940,handlingInstructions#941,initiator#942,leavesQty#943L,nbbPrice#944,nbbQty#945L,nboPrice#946,... 29 more fields] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/asehgal/data/events], PartitionFilters: [], PushedFilters: [IsNotNull(eventID)], ReadSchema: struct<buyDetails.capacity:string,buyDetails.clearingNumber:string,buyDetails.leavesQty:bigint,bu...
对我来说,统计数据在 df.explain(true)
中不可见。我玩了一下,可以使用 println(df.queryExecution.stringWithStats)
打印统计信息,完整示例:
val ss = SparkSession
.builder()
.master("local[*]")
.appName("TestCBO")
.config("spark.sql.cbo.enabled",true)
.getOrCreate()
import ss.implicits._
val df1 = ss.range(10000L).toDF("i")
df1.write.mode("overwrite").saveAsTable("table1")
val df2 = ss.range(100000L).toDF("i")
df2.write.mode("overwrite").saveAsTable("table2")
ss.sql("ANALYZE TABLE table1 COMPUTE STATISTICS FOR COLUMNS i")
ss.sql("ANALYZE TABLE table2 COMPUTE STATISTICS FOR COLUMNS i")
val df = ss.table("table1").join(ss.table("table2"), "i")
.where($"i" > 1000)
println(df.queryExecution.stringWithStats)
给予
== Optimized Logical Plan ==
Project [i#2554L], Statistics(sizeInBytes=147.2 KB, rowCount=9.42E+3, hints=none)
+- Join Inner, (i#2554L = i#2557L), Statistics(sizeInBytes=220.8 KB, rowCount=9.42E+3, hints=none)
:- Filter (isnotnull(i#2554L) && (i#2554L > 1000)), Statistics(sizeInBytes=140.6 KB, rowCount=9.00E+3, hints=none)
: +- Relation[i#2554L] parquet, Statistics(sizeInBytes=156.3 KB, rowCount=1.00E+4, hints=none)
+- Filter ((i#2557L > 1000) && isnotnull(i#2557L)), Statistics(sizeInBytes=1546.9 KB, rowCount=9.90E+4, hints=none)
+- Relation[i#2557L] parquet, Statistics(sizeInBytes=1562.5 KB, rowCount=1.00E+5, hints=none)
这在标准 df.explain
中没有显示,因为这会触发 (Dataset.scala):
ExplainCommand(queryExecution.logical, extended = true) // cost = false in this constructor
要启用成本输出,我们可以自己调用此 ExplainCommand
:
import org.apache.spark.sql.execution.command.ExplainCommand
val explain = ExplainCommand(df.queryExecution.logical, extended = true, cost = true)
ss.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
r => println(r.getString(0))
}
在这里你也可以启用生成代码的输出(设置codegen = true
)
或者,这会给出类似的输出
df // join of two dataframes and filter
.registerTempTable("tmp")
ss.sql("EXPLAIN COST select * from tmp").show(false)
要在 SparkUI 中查看统计信息,您必须转到 SQL-tab,然后 select 相应的查询(在本例中为 df.show()
):