使用 extraOptimizations 转换 Spark SQL AST

Transforming Spark SQL AST with extraOptimizations

我想将 SQL 字符串作为用户输入,然后在执行前对其进行转换。特别是,我想修改顶级投影(select 子句),注入要由查询检索的其他列。

我希望通过使用 sparkSession.experimental.extraOptimizations 连接到 Catalyst 来实现这一点。我知道我正在尝试的并不是严格意义上的优化(转换改变了 SQL 语句的语义),但 API 似乎仍然合适。但是,查询执行器似乎忽略了我的转换。

这里是一个最小的例子来说明我遇到的问题。先定义一个行case class:

case class TestRow(a: Int, b: Int, c: Int)

然后定义一个简单丢弃任何投影的优化规则:

object RemoveProjectOptimisationRule extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
        case x: Project => x.child
    }
}

现在创建一个数据集,注册优化,然后运行一个SQL查询:

// Create a dataset and register table.
val dataset = List(TestRow(1, 2, 3)).toDS()
val tableName: String = "testtable"
dataset.createOrReplaceTempView(tableName)

// Register "optimisation".
sparkSession.experimental.extraOptimizations =  
    Seq(RemoveProjectOptimisationRule)

// Run query.
val projected = sqlContext.sql("SELECT a FROM " + tableName + " WHERE a = 1")

// Print query result and the queryExecution object.
println("Query result:")
projected.collect.foreach(println)
println(projected.queryExecution)

这是输出:

Query result: 
[1]

== Parsed Logical Plan ==
'Project ['a]
+- 'Filter ('a = 1)
   +- 'UnresolvedRelation `testtable`

== Analyzed Logical Plan ==
a: int
Project [a#3]
+- Filter (a#3 = 1)
   +- SubqueryAlias testtable
      +- LocalRelation [a#3, b#4, c#5]

== Optimized Logical Plan ==
Filter (a#3 = 1)
+- LocalRelation [a#3, b#4, c#5]

== Physical Plan ==
*Filter (a#3 = 1)
+- LocalTableScan [a#3, b#4, c#5]

我们看到结果与原始 SQL 语句的结果相同,但未应用转换。然而,在打印逻辑和物理计划时,投影确实被删除了。我还确认(通过调试日志输出)转换确实被调用了。

关于这里发生的事情有什么建议吗?也许优化器只是忽略了改变语义的"optimisations"?

如果使用优化不是可行的方法,有人可以提出替代方案吗?我真正想做的就是解析输入的 SQL 语句,对其进行转换,并将转换后的 AST 传递给 Spark 执行。但据我所知,执行此操作的 APIs 是 Spark sql 包私有的。也许可以使用反射,但我想避免这种情况。

任何指点将不胜感激。

如您所料,这无法正常工作,因为我们假设优化器不会更改查询结果。

具体来说,我们缓存分析器产生的模式(并假设优化器不会更改它)。将行转换为外部格式时,我们使用此模式,因此会截断结果中的列。如果你做的不仅仅是截断(即更改数据类型),这甚至可能会崩溃。

您可以在 this notebook, it is in fact producing the result you would expect under the covers. We are planning to open up more hooks at some point in the near future that would let you modify the plan at other phases of query execution. See SPARK-18127 中看到更多详细信息。

Michael Armbrust 的回答证实了这种转换不应该通过优化来完成。

我转而使用 Spark 中的内部 API 来实现我现在想要的转换。它需要在 Spark 中包私有的方法。因此,我们可以通过将相关逻辑放在适当的包中来访问它们而无需反射。大纲:

// Must be in the spark.sql package.
package org.apache.spark.sql

object SQLTransformer {
    def apply(sparkSession: SparkSession, ...) = {

        // Get the AST.
        val ast = sparkSession.sessionState.sqlParser.parsePlan(sql)

        // Transform the AST.
        val transformedAST = ast match {
            case node: Project => // Modify any top-level projection 
            ...
        }

        // Create a dataset directly from the AST.
        Dataset.ofRows(sparkSession, transformedAST)
    }
}

请注意,这当然可能会与未来版本的 Spark 中断。