java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record 无法转换为 packagename.MyRecord
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to packagename.MyRecord
我正在尝试使用 Spark 1.5.1(使用 Scala 2.10.2)从 HDFS(使用 spark-avro 1.7.7)读取一些 .avro 文件,以便对它们进行一些计算。
现在,假设我已经彻底搜索了网络以找到解决方案(目前最好的 link 是 this one that suggests to use a GenericRecord, while this one reports the same issue, and this one 对我来说不起作用,因为它给出了几乎与我使用的代码相同),我在这里问,因为可能有人有相同的代码。这是代码:
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
import org.apache.hadoop.io.NullWritable
import org.apache.spark.{SparkConf, SparkContext}
object SparkPOC {
def main(args: Array[String]): Unit ={
val conf = new SparkConf()
.setAppName("SparkPOC")
.set("spark.master", "local[4]")
val sc = new SparkContext(conf)
val path = args(0)
val profiles = sc.hadoopFile(
path,
classOf[AvroInputFormat[MyRecord]],
classOf[AvroWrapper[MyRecord]],
classOf[NullWritable]
)
val timeStamps = profiles.map{ p => p._1.datum.getTimeStamp().toString}
timeStamps.foreach(print)
}
我收到以下消息:
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to packagename.MyRecord
at packagename.SparkPOC$$anonfun.apply(SparkPOC.scala:24)
at packagename.SparkPOC$$anonfun.apply(SparkPOC.scala:24)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:890)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:890)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
有人知道吗?我也在考虑使用 spark-avro 的可能性,但它们不支持同时读取多个文件(而 .hadoopFile 支持通配符)。否则,似乎我必须选择 GenericRecord 并使用 .get 方法,从而失去编码模式 (MyRecord) 的优势。
提前致谢。
我通常将其作为 GenericRecord 读入并根据需要显式转换,即
val conf = sc.hadoopConfiguration
sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable], conf).map(_._1.datum().asInstanceOf[MyRecord])
我设置了KryoSerializer和一个spark.kryo.registratorclass后问题就没有了,如下:
val config = new SparkConf()
.setAppName(appName)
.set("spark.master", master)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "com.mypackage.AvroKryoRegistrator")
其中 AvroKryoRegistrator 类似于 this。
我正在尝试使用 Spark 1.5.1(使用 Scala 2.10.2)从 HDFS(使用 spark-avro 1.7.7)读取一些 .avro 文件,以便对它们进行一些计算。
现在,假设我已经彻底搜索了网络以找到解决方案(目前最好的 link 是 this one that suggests to use a GenericRecord, while this one reports the same issue, and this one 对我来说不起作用,因为它给出了几乎与我使用的代码相同),我在这里问,因为可能有人有相同的代码。这是代码:
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
import org.apache.hadoop.io.NullWritable
import org.apache.spark.{SparkConf, SparkContext}
object SparkPOC {
def main(args: Array[String]): Unit ={
val conf = new SparkConf()
.setAppName("SparkPOC")
.set("spark.master", "local[4]")
val sc = new SparkContext(conf)
val path = args(0)
val profiles = sc.hadoopFile(
path,
classOf[AvroInputFormat[MyRecord]],
classOf[AvroWrapper[MyRecord]],
classOf[NullWritable]
)
val timeStamps = profiles.map{ p => p._1.datum.getTimeStamp().toString}
timeStamps.foreach(print)
}
我收到以下消息:
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to packagename.MyRecord
at packagename.SparkPOC$$anonfun.apply(SparkPOC.scala:24)
at packagename.SparkPOC$$anonfun.apply(SparkPOC.scala:24)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:890)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:890)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
有人知道吗?我也在考虑使用 spark-avro 的可能性,但它们不支持同时读取多个文件(而 .hadoopFile 支持通配符)。否则,似乎我必须选择 GenericRecord 并使用 .get 方法,从而失去编码模式 (MyRecord) 的优势。
提前致谢。
我通常将其作为 GenericRecord 读入并根据需要显式转换,即
val conf = sc.hadoopConfiguration
sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable], conf).map(_._1.datum().asInstanceOf[MyRecord])
我设置了KryoSerializer和一个spark.kryo.registratorclass后问题就没有了,如下:
val config = new SparkConf()
.setAppName(appName)
.set("spark.master", master)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "com.mypackage.AvroKryoRegistrator")
其中 AvroKryoRegistrator 类似于 this。