如何通过JDBC接口在SchemaRDD上启用SQL? (甚至可能吗?)
How to enable SQL on SchemaRDD via the JDBC interface? (is it even possible ?)
更新问题陈述
我们正在使用 spark 1.2.0 (Hadoop 2.4)。我们已经使用 HDFS 中的数据文件定义了 SchemaRDD,并希望能够通过 HiveServer2 将这些作为表进行查询。我们在尝试 saveAsTable 时遇到运行时异常,希望获得有关如何继续的指导。
源代码:
package foo.bar
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark._
import org.apache.spark.sql.hive._
object HiveDemo {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Demo")
val sc = new SparkContext(conf)
// sc is an existing SparkContext.
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
// Create an RDD
val zipRDD = sc.textFile("/model-inputs/all_zip_state.csv")
// The schema is encoded in a string
val schemaString = "ODSMEMBERID,ZIPCODE,STATE,TEST_SUPPLIERID,ratio_death_readm_low,ratio_death_readm_high,regions"
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (zip) to Rows.
val rowRDD = zipRDD.map(_.split(",")).map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5), ""))
// Apply the schema to the RDD.
val zipSchemaRDD = hiveContext.applySchema(rowRDD, schema)
// HiveContext's save as Table
zipSchemaRDD.saveAsTable("allzipstable")
}
}
spark 提交命令:
./bin/spark-submit --class foo.bar.HiveDemo --master yarn-cluster --jars /usr/lib/hive/lib/hive-metastore.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 lib/datapipe_2.10-1.0.jar 10
节点运行时异常:
15/01/29 22:35:50 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MappedRDD[3] at map at HiveDemo.scala:30
)
Exception in thread "Driver" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MappedRDD[3] at map at HiveDemo.scala:30
at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun.applyOrElse(Analyzer.scala:83)
at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun.applyOrElse(Analyzer.scala:78)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78)
at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$$anonfun$apply.apply(RuleExecutor.scala:61)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$$anonfun$apply.apply(RuleExecutor.scala:59)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply.apply(RuleExecutor.scala:59)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply.apply(RuleExecutor.scala:51)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413)
at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
at org.apache.spark.sql.SchemaRDDLike$class.saveAsTable(SchemaRDDLike.scala:126)
at org.apache.spark.sql.SchemaRDD.saveAsTable(SchemaRDD.scala:108)
at com.healthagen.datapipe.ahm.HiveDemo$.main(HiveDemo.scala:36)
at com.healthagen.datapipe.ahm.HiveDemo.main(HiveDemo.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:427)
15/01/29 22:35:50 INFO yarn.ApplicationMaster: Invoking sc stop from shutdown hook
另一次尝试:
package foo.bar
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql._
case class AllZips(
ODSMEMBERID: String,
ZIPCODE: String,
STATE: String,
TEST_SUPPLIERID: String,
ratio_death_readm_low: String,
ratio_death_readm_high: String,
regions: String)
object HiveDemo {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("HiveDemo")
val sc = new SparkContext(conf)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext._
val allZips = sc.textFile("/model-inputs/all_zip_state.csv").map(_.split(",")).map(p => AllZips(p(0), p(1), p(2), p(3), p(4), p(5), ""))
val allZipsSchemaRDD = createSchemaRDD(allZips)
allZipsSchemaRDD.saveAsTable("allzipstable")
}
}
节点异常:
15/01/30 00:28:19 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36
)
Exception in thread "Driver" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36
at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun.applyOrElse(Analyzer.scala:83)
at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun.applyOrElse(Analyzer.scala:78)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78)
at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$$anonfun$apply.apply(RuleExecutor.scala:61)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$$anonfun$apply.apply(RuleExecutor.scala:59)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply.apply(RuleExecutor.scala:59)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply.apply(RuleExecutor.scala:51)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413)
at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
at org.apache.spark.sql.SchemaRDDLike$class.saveAsTable(SchemaRDDLike.scala:126)
at org.apache.spark.sql.SchemaRDD.saveAsTable(SchemaRDD.scala:108)
at com.healthagen.datapipe.ahm.HiveDemo$.main(HiveDemo.scala:22)
at com.healthagen.datapipe.ahm.HiveDemo.main(HiveDemo.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:427)
15/01/30 00:28:19 INFO yarn.ApplicationMaster: Invoking sc stop from shutdown hook
您需要使用 HiveContext
这是 java/scala 文档:
* Note that this currently only works with SchemaRDDs that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context.
@Experimental
def saveAsTable(tableName: String): Unit =
sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan, false)).toRdd
因此在您的代码中将其更改为:
val sc = new HiveContext(conf)
实际上你应该将它重命名为
val sqlc = new HiveContext(conf)
仅供参考:有关注册 tables 的更多信息(在 SQLContext 中):请注意,如果以这种方式完成,tables 是暂时的:
/**
* Temporary tables exist only
* during the lifetime of this instance of SQLContext.
*
* @group userf
*/
def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
catalog.registerTable(Seq(tableName), rdd.queryExecution.logical)
}
更新 您的新堆栈跟踪包括以下短语:
Unresolved plan found, tree:
这通常意味着您有一个与基础 table 不匹配的列。我会进一步查看是否能够隔离 - 但同时您也可以从那个角度考虑。
上面的 createSchemaRDD 代码片段在 spark 1.2.1 上工作正常
1.2.0 中存在 CTAS 缺陷
更新问题陈述
我们正在使用 spark 1.2.0 (Hadoop 2.4)。我们已经使用 HDFS 中的数据文件定义了 SchemaRDD,并希望能够通过 HiveServer2 将这些作为表进行查询。我们在尝试 saveAsTable 时遇到运行时异常,希望获得有关如何继续的指导。
源代码:
package foo.bar
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark._
import org.apache.spark.sql.hive._
object HiveDemo {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Demo")
val sc = new SparkContext(conf)
// sc is an existing SparkContext.
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
// Create an RDD
val zipRDD = sc.textFile("/model-inputs/all_zip_state.csv")
// The schema is encoded in a string
val schemaString = "ODSMEMBERID,ZIPCODE,STATE,TEST_SUPPLIERID,ratio_death_readm_low,ratio_death_readm_high,regions"
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (zip) to Rows.
val rowRDD = zipRDD.map(_.split(",")).map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5), ""))
// Apply the schema to the RDD.
val zipSchemaRDD = hiveContext.applySchema(rowRDD, schema)
// HiveContext's save as Table
zipSchemaRDD.saveAsTable("allzipstable")
}
}
spark 提交命令:
./bin/spark-submit --class foo.bar.HiveDemo --master yarn-cluster --jars /usr/lib/hive/lib/hive-metastore.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 lib/datapipe_2.10-1.0.jar 10
节点运行时异常:
15/01/29 22:35:50 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MappedRDD[3] at map at HiveDemo.scala:30
)
Exception in thread "Driver" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MappedRDD[3] at map at HiveDemo.scala:30
at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun.applyOrElse(Analyzer.scala:83)
at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun.applyOrElse(Analyzer.scala:78)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78)
at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$$anonfun$apply.apply(RuleExecutor.scala:61)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$$anonfun$apply.apply(RuleExecutor.scala:59)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply.apply(RuleExecutor.scala:59)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply.apply(RuleExecutor.scala:51)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413)
at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
at org.apache.spark.sql.SchemaRDDLike$class.saveAsTable(SchemaRDDLike.scala:126)
at org.apache.spark.sql.SchemaRDD.saveAsTable(SchemaRDD.scala:108)
at com.healthagen.datapipe.ahm.HiveDemo$.main(HiveDemo.scala:36)
at com.healthagen.datapipe.ahm.HiveDemo.main(HiveDemo.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:427)
15/01/29 22:35:50 INFO yarn.ApplicationMaster: Invoking sc stop from shutdown hook
另一次尝试:
package foo.bar
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql._
case class AllZips(
ODSMEMBERID: String,
ZIPCODE: String,
STATE: String,
TEST_SUPPLIERID: String,
ratio_death_readm_low: String,
ratio_death_readm_high: String,
regions: String)
object HiveDemo {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("HiveDemo")
val sc = new SparkContext(conf)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext._
val allZips = sc.textFile("/model-inputs/all_zip_state.csv").map(_.split(",")).map(p => AllZips(p(0), p(1), p(2), p(3), p(4), p(5), ""))
val allZipsSchemaRDD = createSchemaRDD(allZips)
allZipsSchemaRDD.saveAsTable("allzipstable")
}
}
节点异常:
15/01/30 00:28:19 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36
)
Exception in thread "Driver" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36
at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun.applyOrElse(Analyzer.scala:83)
at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun.applyOrElse(Analyzer.scala:78)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78)
at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$$anonfun$apply.apply(RuleExecutor.scala:61)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$$anonfun$apply.apply(RuleExecutor.scala:59)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply.apply(RuleExecutor.scala:59)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply.apply(RuleExecutor.scala:51)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413)
at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
at org.apache.spark.sql.SchemaRDDLike$class.saveAsTable(SchemaRDDLike.scala:126)
at org.apache.spark.sql.SchemaRDD.saveAsTable(SchemaRDD.scala:108)
at com.healthagen.datapipe.ahm.HiveDemo$.main(HiveDemo.scala:22)
at com.healthagen.datapipe.ahm.HiveDemo.main(HiveDemo.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:427)
15/01/30 00:28:19 INFO yarn.ApplicationMaster: Invoking sc stop from shutdown hook
您需要使用 HiveContext
这是 java/scala 文档:
* Note that this currently only works with SchemaRDDs that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context.
@Experimental
def saveAsTable(tableName: String): Unit =
sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan, false)).toRdd
因此在您的代码中将其更改为:
val sc = new HiveContext(conf)
实际上你应该将它重命名为
val sqlc = new HiveContext(conf)
仅供参考:有关注册 tables 的更多信息(在 SQLContext 中):请注意,如果以这种方式完成,tables 是暂时的:
/**
* Temporary tables exist only
* during the lifetime of this instance of SQLContext.
*
* @group userf
*/
def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
catalog.registerTable(Seq(tableName), rdd.queryExecution.logical)
}
更新 您的新堆栈跟踪包括以下短语:
Unresolved plan found, tree:
这通常意味着您有一个与基础 table 不匹配的列。我会进一步查看是否能够隔离 - 但同时您也可以从那个角度考虑。
上面的 createSchemaRDD 代码片段在 spark 1.2.1 上工作正常
1.2.0 中存在 CTAS 缺陷