带有配置单元的 apache Spark
apache Spark with hive
如何 read/write 数据 from/to 配置单元?
是否需要用hive profile编译spark才能与hive交互?
与配置单元交互需要哪些 Maven 依赖项?
我找不到一个很好的文档来一步一步地使用配置单元。
目前这是我的代码
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test"))
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val sqlCon = new SQLContext(sc)
val schemaString = "Date:string,Open:double,High:double,Low:double,Close:double,Volume:double,Adj_Close:double"
val schema =
StructType(
schemaString.split(",").map(fieldName => StructField(fieldName.split(":")(0),
getFieldTypeInSchema(fieldName.split(":")(1)), true)))
val rdd = sc.textFile("hdfs://45.55.159.119:9000/yahoo_stocks.csv")
//val rdd = sc.parallelize(arr)
val rowRDDx = noHeader.map(p => {
var list: collection.mutable.Seq[Any] = collection.mutable.Seq.empty[Any]
var index = 0
val regex = rowSplittingRegexBuilder(Seq(","))
var tokens = p.split(regex)
tokens.foreach(value => {
var valType = schema.fields(index).dataType
var returnVal: Any = null
valType match {
case IntegerType => returnVal = value.toString.toInt
case DoubleType => returnVal = value.toString.toDouble
case LongType => returnVal = value.toString.toLong
case FloatType => returnVal = value.toString.toFloat
case ByteType => returnVal = value.toString.toByte
case StringType => returnVal = value.toString
case TimestampType => returnVal = value.toString
}
list = list :+ returnVal
index += 1
})
Row.fromSeq(list)
})
val df = sqlCon.applySchema(rowRDDx, schema)
HiveContext.sql("create table yahoo_orc_table (date STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT, close_price FLOAT, volume INT, adj_price FLOAT) stored as orc")
df.saveAsTable("hive", "org.apache.spark.sql.hive.orc", SaveMode.Append)
我遇到以下异常
15/10/12 14:57:36 INFO storage.BlockManagerMaster: Registered BlockManager
15/10/12 14:57:38 INFO scheduler.EventLoggingListener: Logging events to hdfs://host:9000/spark/logs/local-1444676256555
Exception in thread "main" java.lang.VerifyError: Bad return type
Exception Details:
Location:
org/apache/spark/sql/catalyst/expressions/Pmod.inputType()Lorg/apache/spark/sql/types/AbstractDataType; @3: areturn
Reason:
Type 'org/apache/spark/sql/types/NumericType$' (current frame, stack[0]) is not assignable to 'org/apache/spark/sql/types/AbstractDataType' (from method signature)
Current Frame:
bci: @3
flags: { }
locals: { 'org/apache/spark/sql/catalyst/expressions/Pmod' }
stack: { 'org/apache/spark/sql/types/NumericType$' }
Bytecode:
0000000: b200 63b0
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2595)
at java.lang.Class.getConstructor0(Class.java:2895)
at java.lang.Class.getDeclaredConstructor(Class.java:2066)
at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun.apply(FunctionRegistry.scala:267)
at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun.apply(FunctionRegistry.scala:267)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.expression(FunctionRegistry.scala:267)
at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.<init>(FunctionRegistry.scala:148)
at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.<clinit>(FunctionRegistry.scala)
at org.apache.spark.sql.hive.HiveContext.functionRegistry$lzycompute(HiveContext.scala:414)
at org.apache.spark.sql.hive.HiveContext.functionRegistry(HiveContext.scala:413)
at org.apache.spark.sql.UDFRegistration.<init>(UDFRegistration.scala:39)
at org.apache.spark.sql.SQLContext.<init>(SQLContext.scala:203)
at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:72)
谢谢
如dr_stein所述,此错误通常是由于编译时和运行时JDK版本不兼容,例如运行 JDK 1.6与1.7编译的jar。
我还会检查您的配置单元库是否反映了正确的版本,以及您的配置单元服务器是否也在 运行 与您相同的 JDK 上。
您也可以尝试 运行 -noverify 选项,这将禁用验证。
如何 read/write 数据 from/to 配置单元? 是否需要用hive profile编译spark才能与hive交互? 与配置单元交互需要哪些 Maven 依赖项?
我找不到一个很好的文档来一步一步地使用配置单元。
目前这是我的代码
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test"))
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val sqlCon = new SQLContext(sc)
val schemaString = "Date:string,Open:double,High:double,Low:double,Close:double,Volume:double,Adj_Close:double"
val schema =
StructType(
schemaString.split(",").map(fieldName => StructField(fieldName.split(":")(0),
getFieldTypeInSchema(fieldName.split(":")(1)), true)))
val rdd = sc.textFile("hdfs://45.55.159.119:9000/yahoo_stocks.csv")
//val rdd = sc.parallelize(arr)
val rowRDDx = noHeader.map(p => {
var list: collection.mutable.Seq[Any] = collection.mutable.Seq.empty[Any]
var index = 0
val regex = rowSplittingRegexBuilder(Seq(","))
var tokens = p.split(regex)
tokens.foreach(value => {
var valType = schema.fields(index).dataType
var returnVal: Any = null
valType match {
case IntegerType => returnVal = value.toString.toInt
case DoubleType => returnVal = value.toString.toDouble
case LongType => returnVal = value.toString.toLong
case FloatType => returnVal = value.toString.toFloat
case ByteType => returnVal = value.toString.toByte
case StringType => returnVal = value.toString
case TimestampType => returnVal = value.toString
}
list = list :+ returnVal
index += 1
})
Row.fromSeq(list)
})
val df = sqlCon.applySchema(rowRDDx, schema)
HiveContext.sql("create table yahoo_orc_table (date STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT, close_price FLOAT, volume INT, adj_price FLOAT) stored as orc")
df.saveAsTable("hive", "org.apache.spark.sql.hive.orc", SaveMode.Append)
我遇到以下异常
15/10/12 14:57:36 INFO storage.BlockManagerMaster: Registered BlockManager
15/10/12 14:57:38 INFO scheduler.EventLoggingListener: Logging events to hdfs://host:9000/spark/logs/local-1444676256555
Exception in thread "main" java.lang.VerifyError: Bad return type
Exception Details:
Location:
org/apache/spark/sql/catalyst/expressions/Pmod.inputType()Lorg/apache/spark/sql/types/AbstractDataType; @3: areturn
Reason:
Type 'org/apache/spark/sql/types/NumericType$' (current frame, stack[0]) is not assignable to 'org/apache/spark/sql/types/AbstractDataType' (from method signature)
Current Frame:
bci: @3
flags: { }
locals: { 'org/apache/spark/sql/catalyst/expressions/Pmod' }
stack: { 'org/apache/spark/sql/types/NumericType$' }
Bytecode:
0000000: b200 63b0
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2595)
at java.lang.Class.getConstructor0(Class.java:2895)
at java.lang.Class.getDeclaredConstructor(Class.java:2066)
at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun.apply(FunctionRegistry.scala:267)
at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun.apply(FunctionRegistry.scala:267)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.expression(FunctionRegistry.scala:267)
at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.<init>(FunctionRegistry.scala:148)
at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.<clinit>(FunctionRegistry.scala)
at org.apache.spark.sql.hive.HiveContext.functionRegistry$lzycompute(HiveContext.scala:414)
at org.apache.spark.sql.hive.HiveContext.functionRegistry(HiveContext.scala:413)
at org.apache.spark.sql.UDFRegistration.<init>(UDFRegistration.scala:39)
at org.apache.spark.sql.SQLContext.<init>(SQLContext.scala:203)
at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:72)
谢谢
如dr_stein所述,此错误通常是由于编译时和运行时JDK版本不兼容,例如运行 JDK 1.6与1.7编译的jar。
我还会检查您的配置单元库是否反映了正确的版本,以及您的配置单元服务器是否也在 运行 与您相同的 JDK 上。
您也可以尝试 运行 -noverify 选项,这将禁用验证。