java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record 无法转换为 packagename.MyRecord

java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to packagename.MyRecord

我正在尝试使用 Spark 1.5.1(使用 Scala 2.10.2)从 HDFS(使用 spark-avro 1.7.7)读取一些 .avro 文件,以便对它们进行一些计算。

现在,假设我已经彻底搜索了网络以找到解决方案(目前最好的 link 是 this one that suggests to use a GenericRecord, while this one reports the same issue, and this one 对我来说不起作用,因为它给出了几乎与我使用的代码相同),我在这里问,因为可能有人有相同的代码。这是代码:

import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper} 
import org.apache.hadoop.io.NullWritable 
import org.apache.spark.{SparkConf, SparkContext}

object SparkPOC {

  def main(args: Array[String]): Unit ={

    val conf = new SparkConf()
      .setAppName("SparkPOC")
      .set("spark.master", "local[4]")
    val sc = new SparkContext(conf)
    val path = args(0)
    val profiles = sc.hadoopFile(
      path,
      classOf[AvroInputFormat[MyRecord]],
      classOf[AvroWrapper[MyRecord]],
      classOf[NullWritable]
    )

    val timeStamps = profiles.map{ p => p._1.datum.getTimeStamp().toString}
    timeStamps.foreach(print)

}

我收到以下消息:

java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to packagename.MyRecord
    at packagename.SparkPOC$$anonfun.apply(SparkPOC.scala:24)
    at packagename.SparkPOC$$anonfun.apply(SparkPOC.scala:24)
    at scala.collection.Iterator$$anon.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:890)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:890)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

有人知道吗?我也在考虑使用 spark-avro 的可能性,但它们不支持同时读取多个文件(而 .hadoopFile 支持通配符)。否则,似乎我必须选择 GenericRecord 并使用 .get 方法,从而失去编码模式 (MyRecord) 的优势。

提前致谢。

我通常将其作为 GenericRecord 读入并根据需要显式转换,即

val conf = sc.hadoopConfiguration
sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable], conf).map(_._1.datum().asInstanceOf[MyRecord])

我设置了KryoSerializer和一个spark.kryo.registratorclass后问题就没有了,如下:

val config = new SparkConf()
  .setAppName(appName)
  .set("spark.master", master)
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrator", "com.mypackage.AvroKryoRegistrator")

其中 AvroKryoRegistrator 类似于 this