火花:java.io.NotSerializableException:org.apache.avro.Schema$RecordSchema

Spark: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema

我正在使用以下代码创建 avro RDD

 def convert2Avro(data : String ,schema : Schema)  : AvroKey[GenericRecord] = {
   var wrapper = new AvroKey[GenericRecord]()
   var record = new GenericData.Record(schema)
   record.put("empname","John")
    wrapper.datum(record)
    return wrapper 
  }

并按如下方式创建 avro RDD

 var avroRDD = fieldsRDD.map(x =>(convert2Avro(x, schema)))

执行时,我在上面的行中遇到以下异常

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
        at org.apache.spark.rdd.RDD.map(RDD.scala:270)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)

有指针吗?

Schema.ReocrdSchema class 尚未实施 serializable。所以它无法通过网络传输。我们可以将模式转换为字符串并传递给方法,并在方法内部重建模式对象。

var schemaString = schema.toString
var avroRDD = fieldsRDD.map(x =>(convert2Avro(x, schemaString)))

在方法内部重构模式:

def convert2Avro(data : String ,schemaString : String)  : AvroKey[GenericRecord] = {
   var schema = parser.parse(schemaString)
   var wrapper = new AvroKey[GenericRecord]()
   var record = new GenericData.Record(schema)
   record.put("empname","John")
    wrapper.datum(record)
    return wrapper 
  }

另一种方法(来自http://aseigneurin.github.io/2016/03/04/kafka-spark-avro-producing-and-consuming-avro-messages.html)是使用静态初始化。

正如他们在 link

上解释的那样

we are using a static initialization block. An instance of the recordInjection object will be created per JVM, i.e. we will have one instance per Spark worker

并且由于它是为每个工作人员重新创建的,因此不需要序列化。

我更喜欢静态初始化器,因为我担心 toString() 可能不包含构造对象所需的所有信息(在这种情况下它似乎工作得很好,但序列化不是 toString() 的广告目的)。但是,使用 static 的缺点是它并不是 static 的真正正确用法(例如,参见 Java: when to use static methods

因此,无论您喜欢哪个 - 因为两者似乎都很好用,所以这可能更取决于您喜欢的风格。

更新 当然,根据您的程序,最优雅的解决方案可能是通过将所有 avro 代码包含在 worker 中来避免所有问题,即执行您需要执行的所有 Avro 处理,例如写入 Kafka 主题或其他任何内容,在 "convert2Avro"。那么就没有必要return将这些对象放回一个RDD中了。这真的取决于你想要 RDD 做什么。