将 DataFrame 保存到 Hive 时出现 Spark Scala 错误
Spark Scala Error while saving DataFrame to Hive
我通过组合多个数组构建了一个DataFrame。我正在尝试将其保存到配置单元 table 中,但出现 ArrayIndexOutofBound 异常。以下是我得到的代码和错误。我尝试在主 def 的外部和内部添加 case class,但仍然出现相同的错误。
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext, DataFrame}
import org.apache.spark.ml.feature.RFormula
import java.text._
import java.util.Date
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.RandomForestClassificationModel
import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.sql.hive.HiveContext
//case class Rows(col1: String, col2: String, col3: String, col4: String, col5: String, col6: String)
object MLRCreate{
// case class Row(col1: String, col2: String, col3: String, col4: String, col5: String, col6: String)
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("MLRCreate")
val sc = new SparkContext(conf)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext.implicits._
import hiveContext.sql
val ReqId = new java.text.SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date())
val dirName = "/user/ec2-user/SavedModels/"+ReqId
val df = Functions.loadData(hiveContext,args(0),args(1))
val form = args(1).toLowerCase
val lbl = form.split("~")
var lrModel:LinearRegressionModel = null;
val Array(training, test) = df.randomSplit(Array(args(3).toDouble, (1-args(3).toDouble)), seed = args(4).toInt)
lrModel = Functions.mlr(training)
var columnnames = Functions.resultColumns(df).substring(1)
var columnsFinal = columnnames.split(",")
columnsFinal = "intercept" +: columnsFinal
var coeff = lrModel.coefficients.toArray.map(_.toString)
coeff = lrModel.intercept.toString +: coeff
var stdErr = lrModel.summary.coefficientStandardErrors.map(_.toString)
var tval = lrModel.summary.tValues.map(_.toString)
var pval = lrModel.summary.pValues.map(_.toString)
var Signif:Array[String] = new Array[String](pval.length)
for(j <- 0 to pval.length-1){
var sign = pval(j).toDouble;
sign = Math.abs(sign);
if(sign <= 0.001){
Signif(j) = "***";
}else if(sign <= 0.01){
Signif(j) = "**";
}else if(sign <= 0.05){
Signif(j) = "*";
}else if(sign <= 0.1){
Signif(j) = ".";
}else{Signif(j) = " ";
}
println(columnsFinal(j)+"#########"+coeff(j)+"#########"+stdErr(j)+"#########"+tval(j)+"#########"+pval(j)+"########"+Signif)
}
case class Row(col1: String, col2: String, col3: String, col4: String, col5: String, col6: String)
// print(columnsFinali.mkString("#"),coeff.mkString("#"),stdErr.mkString("#"),tval.mkString("#"),pval.mkString("#"))
val sums = Array(columnsFinal, coeff, stdErr, tval, pval, Signif).transpose
val rdd = sc.parallelize(sums).map(ys => Row(ys(0), ys(1), ys(2), ys(3),ys(4),ys(5)))
// val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
// import hiveContext.implicits._
// import hiveContext.sql
val result = rdd.toDF("Name","Coefficients","Std_Error","tValue","pValue","Significance")
result.show()
result.saveAsTable("iaw_model_summary.IAW_"+ReqId)
print(ReqId)
lrModel.save(dirName)
}
}
下面是我得到的错误,
16/05/12 07:17:25 ERROR Executor: Exception in task 2.0 in stage 23.0 (TID 839)
java.lang.ArrayIndexOutOfBoundsException: 1
at IAWMLRCreate$$anonfun.apply(IAWMLRCreate.scala:96)
at IAWMLRCreate$$anonfun.apply(IAWMLRCreate.scala:96)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:212)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:212)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
建议您检查要转置的数组的长度:columnsFinal, coeff, stdErr, tval, pval, Signif
。如果其中任何一个比其他行 shorter/longer,则转置后的某些行将不完整。转置时,Scala 不会为您填充空值或任何内容:
val a1 = Array(1,2,3)
val a2 = Array(5,6)
Array(a1, a2).transpose.foreach(x => println(x.toList))
打印:
List(1, 5)
List(2, 6)
List(3)
我通过组合多个数组构建了一个DataFrame。我正在尝试将其保存到配置单元 table 中,但出现 ArrayIndexOutofBound 异常。以下是我得到的代码和错误。我尝试在主 def 的外部和内部添加 case class,但仍然出现相同的错误。
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext, DataFrame}
import org.apache.spark.ml.feature.RFormula
import java.text._
import java.util.Date
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.RandomForestClassificationModel
import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.sql.hive.HiveContext
//case class Rows(col1: String, col2: String, col3: String, col4: String, col5: String, col6: String)
object MLRCreate{
// case class Row(col1: String, col2: String, col3: String, col4: String, col5: String, col6: String)
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("MLRCreate")
val sc = new SparkContext(conf)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext.implicits._
import hiveContext.sql
val ReqId = new java.text.SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date())
val dirName = "/user/ec2-user/SavedModels/"+ReqId
val df = Functions.loadData(hiveContext,args(0),args(1))
val form = args(1).toLowerCase
val lbl = form.split("~")
var lrModel:LinearRegressionModel = null;
val Array(training, test) = df.randomSplit(Array(args(3).toDouble, (1-args(3).toDouble)), seed = args(4).toInt)
lrModel = Functions.mlr(training)
var columnnames = Functions.resultColumns(df).substring(1)
var columnsFinal = columnnames.split(",")
columnsFinal = "intercept" +: columnsFinal
var coeff = lrModel.coefficients.toArray.map(_.toString)
coeff = lrModel.intercept.toString +: coeff
var stdErr = lrModel.summary.coefficientStandardErrors.map(_.toString)
var tval = lrModel.summary.tValues.map(_.toString)
var pval = lrModel.summary.pValues.map(_.toString)
var Signif:Array[String] = new Array[String](pval.length)
for(j <- 0 to pval.length-1){
var sign = pval(j).toDouble;
sign = Math.abs(sign);
if(sign <= 0.001){
Signif(j) = "***";
}else if(sign <= 0.01){
Signif(j) = "**";
}else if(sign <= 0.05){
Signif(j) = "*";
}else if(sign <= 0.1){
Signif(j) = ".";
}else{Signif(j) = " ";
}
println(columnsFinal(j)+"#########"+coeff(j)+"#########"+stdErr(j)+"#########"+tval(j)+"#########"+pval(j)+"########"+Signif)
}
case class Row(col1: String, col2: String, col3: String, col4: String, col5: String, col6: String)
// print(columnsFinali.mkString("#"),coeff.mkString("#"),stdErr.mkString("#"),tval.mkString("#"),pval.mkString("#"))
val sums = Array(columnsFinal, coeff, stdErr, tval, pval, Signif).transpose
val rdd = sc.parallelize(sums).map(ys => Row(ys(0), ys(1), ys(2), ys(3),ys(4),ys(5)))
// val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
// import hiveContext.implicits._
// import hiveContext.sql
val result = rdd.toDF("Name","Coefficients","Std_Error","tValue","pValue","Significance")
result.show()
result.saveAsTable("iaw_model_summary.IAW_"+ReqId)
print(ReqId)
lrModel.save(dirName)
}
}
下面是我得到的错误,
16/05/12 07:17:25 ERROR Executor: Exception in task 2.0 in stage 23.0 (TID 839)
java.lang.ArrayIndexOutOfBoundsException: 1
at IAWMLRCreate$$anonfun.apply(IAWMLRCreate.scala:96)
at IAWMLRCreate$$anonfun.apply(IAWMLRCreate.scala:96)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:212)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:212)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
建议您检查要转置的数组的长度:columnsFinal, coeff, stdErr, tval, pval, Signif
。如果其中任何一个比其他行 shorter/longer,则转置后的某些行将不完整。转置时,Scala 不会为您填充空值或任何内容:
val a1 = Array(1,2,3)
val a2 = Array(5,6)
Array(a1, a2).transpose.foreach(x => println(x.toList))
打印:
List(1, 5)
List(2, 6)
List(3)