如何通过JDBC接口在SchemaRDD上启用SQL? (甚至可能吗?)

How to enable SQL on SchemaRDD via the JDBC interface? (is it even possible ?)

更新问题陈述

我们正在使用 spark 1.2.0 (Hadoop 2.4)。我们已经使用 HDFS 中的数据文件定义了 SchemaRDD,并希望能够通过 HiveServer2 将这些作为表进行查询。我们在尝试 saveAsTable 时遇到运行时异常,希望获得有关如何继续的指导。

源代码:

package foo.bar

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark._
import org.apache.spark.sql.hive._

object HiveDemo {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Demo")
    val sc = new SparkContext(conf)

    // sc is an existing SparkContext.
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

    // Create an RDD
    val zipRDD = sc.textFile("/model-inputs/all_zip_state.csv")

    // The schema is encoded in a string
    val schemaString = "ODSMEMBERID,ZIPCODE,STATE,TEST_SUPPLIERID,ratio_death_readm_low,ratio_death_readm_high,regions"

    // Generate the schema based on the string of schema
    val schema =
      StructType(
        schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true)))

    // Convert records of the RDD (zip) to Rows.
    val rowRDD = zipRDD.map(_.split(",")).map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5), ""))

    // Apply the schema to the RDD.
    val zipSchemaRDD = hiveContext.applySchema(rowRDD, schema)

    // HiveContext's save as Table
    zipSchemaRDD.saveAsTable("allzipstable")

  }
}

spark 提交命令:

./bin/spark-submit  --class foo.bar.HiveDemo --master yarn-cluster --jars /usr/lib/hive/lib/hive-metastore.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 lib/datapipe_2.10-1.0.jar 10

节点运行时异常:

15/01/29 22:35:50 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
 LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MappedRDD[3] at map at HiveDemo.scala:30
)
Exception in thread "Driver" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
 LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MappedRDD[3] at map at HiveDemo.scala:30

    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun.applyOrElse(Analyzer.scala:83)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun.applyOrElse(Analyzer.scala:78)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$$anonfun$apply.apply(RuleExecutor.scala:61)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$$anonfun$apply.apply(RuleExecutor.scala:59)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply.apply(RuleExecutor.scala:59)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply.apply(RuleExecutor.scala:51)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
    at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
    at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
    at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413)
    at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413)
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
    at org.apache.spark.sql.SchemaRDDLike$class.saveAsTable(SchemaRDDLike.scala:126)
    at org.apache.spark.sql.SchemaRDD.saveAsTable(SchemaRDD.scala:108)
    at com.healthagen.datapipe.ahm.HiveDemo$.main(HiveDemo.scala:36)
    at com.healthagen.datapipe.ahm.HiveDemo.main(HiveDemo.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:427)
15/01/29 22:35:50 INFO yarn.ApplicationMaster: Invoking sc stop from shutdown hook

另一次尝试:

package foo.bar

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql._

case class AllZips(
  ODSMEMBERID: String,
  ZIPCODE: String,
  STATE: String,
  TEST_SUPPLIERID: String,
  ratio_death_readm_low: String,
  ratio_death_readm_high: String,
  regions: String)

object HiveDemo {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("HiveDemo")
    val sc = new SparkContext(conf)
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
    import hiveContext._
    val allZips = sc.textFile("/model-inputs/all_zip_state.csv").map(_.split(",")).map(p => AllZips(p(0), p(1), p(2), p(3), p(4), p(5), ""))
    val allZipsSchemaRDD = createSchemaRDD(allZips)
    allZipsSchemaRDD.saveAsTable("allzipstable")
  }
}

节点异常:

15/01/30 00:28:19 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
 LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36
)
Exception in thread "Driver" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
 LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36

    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun.applyOrElse(Analyzer.scala:83)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun.applyOrElse(Analyzer.scala:78)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$$anonfun$apply.apply(RuleExecutor.scala:61)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$$anonfun$apply.apply(RuleExecutor.scala:59)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply.apply(RuleExecutor.scala:59)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply.apply(RuleExecutor.scala:51)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
    at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
    at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
    at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413)
    at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413)
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
    at org.apache.spark.sql.SchemaRDDLike$class.saveAsTable(SchemaRDDLike.scala:126)
    at org.apache.spark.sql.SchemaRDD.saveAsTable(SchemaRDD.scala:108)
    at com.healthagen.datapipe.ahm.HiveDemo$.main(HiveDemo.scala:22)
    at com.healthagen.datapipe.ahm.HiveDemo.main(HiveDemo.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:427)
15/01/30 00:28:19 INFO yarn.ApplicationMaster: Invoking sc stop from shutdown hook

您需要使用 HiveContext

这是 java/scala 文档:

   * Note that this currently only works with SchemaRDDs that are created from a HiveContext as
   * there is no notion of a persisted catalog in a standard SQL context. 


  @Experimental
  def saveAsTable(tableName: String): Unit =
    sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan, false)).toRdd

因此在您的代码中将其更改为:

val sc = new HiveContext(conf)

实际上你应该将它重命名为

val sqlc = new HiveContext(conf)

仅供参考:有关注册 tables 的更多信息(在 SQLContext 中):请注意,如果以这种方式完成,tables 是暂时的:

  /**
   *  Temporary tables exist only
   * during the lifetime of this instance of SQLContext.
   *
   * @group userf
   */
  def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
    catalog.registerTable(Seq(tableName), rdd.queryExecution.logical)
  }

更新 您的新堆栈跟踪包括以下短语:

Unresolved plan found, tree:

这通常意味着您有一个与基础 table 不匹配的列。我会进一步查看是否能够隔离 - 但同时您也可以从那个角度考虑。

上面的 createSchemaRDD 代码片段在 spark 1.2.1 上工作正常

1.2.0 中存在 CTAS 缺陷