Spark SQL over Streaming - ArrayIndexOutOfBoundsException
Spark SQL over Streaming - ArrayIndexOutOfBoundsException
我有以下代码可以在流媒体上启动 SQL 查询。我的问题是,在其中一个结果显示 ArrayIndexOutOfBoundsException 之后。为什么会这样?
import org.apache.spark._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration
import org.apache.spark.sql.functions.udf
object StreamingSQL {
case class Persons(name: String, age: Int)
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
val sc = new SparkContext(sparkConf)
// Create the context
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.textFileStream("/home/cloudera/Smartcare/stream/")
lines.foreachRDD(rdd=>rdd.foreach(println))
val sqc = new SQLContext(sc);
//import sqc.createSchemaRDD
import sqc.implicits._
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
lines.foreachRDD{rdd=>
val persons = rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).toDF()
persons.registerTempTable("data")
val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
teenagers.foreach(println)
}
ssc.start()
ssc.awaitTermination()
}
}
这是我得到的输出。结果正确后,我跳过错误:
16/03/23 16:58:56 INFO GenerateUnsafeProjection: Code generated in 131.828141 ms
[Edgar]
16/03/23 16:58:56 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.ArrayIndexOutOfBoundsException: 1
我的 txt 是:
Ana,31
Edgar,16
Luis,22
Noelia,26
Isabel50
Pablo,34
Laura,18
Paco,17
因为Isabel50
没有逗号。您的 split(",")
只为该行返回一个值,因此 p(1)
对该行失败。
我有以下代码可以在流媒体上启动 SQL 查询。我的问题是,在其中一个结果显示 ArrayIndexOutOfBoundsException 之后。为什么会这样?
import org.apache.spark._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration
import org.apache.spark.sql.functions.udf
object StreamingSQL {
case class Persons(name: String, age: Int)
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
val sc = new SparkContext(sparkConf)
// Create the context
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.textFileStream("/home/cloudera/Smartcare/stream/")
lines.foreachRDD(rdd=>rdd.foreach(println))
val sqc = new SQLContext(sc);
//import sqc.createSchemaRDD
import sqc.implicits._
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
lines.foreachRDD{rdd=>
val persons = rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).toDF()
persons.registerTempTable("data")
val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
teenagers.foreach(println)
}
ssc.start()
ssc.awaitTermination()
}
}
这是我得到的输出。结果正确后,我跳过错误:
16/03/23 16:58:56 INFO GenerateUnsafeProjection: Code generated in 131.828141 ms
[Edgar]
16/03/23 16:58:56 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.ArrayIndexOutOfBoundsException: 1
我的 txt 是:
Ana,31
Edgar,16
Luis,22
Noelia,26
Isabel50
Pablo,34
Laura,18
Paco,17
因为Isabel50
没有逗号。您的 split(",")
只为该行返回一个值,因此 p(1)
对该行失败。