Flink Avro 序列化在使用 GenericRecords 时显示 "not serializable" 错误

Flink Avro Serialization shows "not serializable" error when working with GenericRecords

我真的很难让 Flink 与使用 Confluent Schema Registry 中的 Avro 模式的 运行 Kafka 实例正确通信(对于 both 键和值)。

经过一段时间的思考和重构我的程序,我能够将我的实现推到现在:

生产者方法

    public static FlinkKafkaProducer<Tuple2<GenericRecord,GenericRecord>> kafkaAvroGenericProducer() {  
        final Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "--.-.-.--:9092");
        properties.put("schema.registry.url", "http://--.-.-.---:8081");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KVSerializationSchema.class.getName()); //wrong class should not matter
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KVSerializationSchema.class.getName()); //wrong class but should not matter


        return new FlinkKafkaProducer<Tuple2<GenericRecord,GenericRecord>>("flink_output",
                new GenericSerializer("flink_output", schemaK, schemaV, "http://--.-.-.---:8081"),
                properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

    }

GenericSerializer.java

package com.reeeliance.flink;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import flinkfix.ConfluentRegistryAvroSerializationSchema;

public class GenericSerializer implements KafkaSerializationSchema<Tuple2<GenericRecord,GenericRecord>>{

    private String topic;   
    private Schema schemaKey;
    private Schema schemaValue;
    private String registryUrl;

    public GenericSerializer(String topic, Schema schemaK, Schema schemaV, String url) {
        super();
        this.topic = topic;
        this.schemaKey = schemaK;
        this.schemaValue = schemaV;
        this.registryUrl = url;
    }

    public GenericSerializer() {
        super();
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(Tuple2<GenericRecord,GenericRecord> element, Long timestamp) {
        byte[] key = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-key", schemaKey, registryUrl).serialize(element.f0);
        byte[] value = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-value", schemaValue, registryUrl).serialize(element.f1);

        return new ProducerRecord<byte[], byte[]>(topic, key, value);
    }

}

然而,当我执行作业时,它在准备阶段失败了,没有作业实际上 运行 以下错误:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [H_EQUNR type:STRING pos:0] is not serializable. The object probably contains or references non serializable fields.
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:617)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:571)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:547)
    at com.reeeliance.flink.StreamingJob.kafkaAvroGenericProducer(StreamingJob.java:257)
    at com.reeeliance.flink.StreamingJob.main(StreamingJob.java:84)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
    - custom writeObject data (class "java.util.ArrayList")
    - root object (class "org.apache.avro.Schema$LockableArrayList", [H_EQUNR type:STRING pos:0])
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at java.util.ArrayList.writeObject(ArrayList.java:766)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
    ... 8 more

我知道所有 classes 都必须实现 Serializable-Interface 或成为瞬态的,但我不使用我自己的 classes并且错误不针对不可序列化的函数(通常线程处理),而是记录或字段。 该字段来自键架构,该架构仅包含此一个字段。我假设我的错误在于使用 GenericRecord,它没有实现 Serializable-Interface,但我看到 GenericRecord 被大量用于这种序列化,所以它真的没有意义对我来说。

classConfluentRegistryAvroSerializationSchema取自GitHub, as it is not yet included in the current Flink version (1.9.1) we are using. I included the necessary classes and changed classes and I don't think this might be the reason for my problem. (Issue solved)

谁能帮我调试一下?如果您能告诉我实现相同目标的不同方法,我也将不胜感激,到目前为止,Flink Avro 和 Confluent Schema Registry 的不兼容性一直让我抓狂。

异常消息告诉您哪个 class 不可序列化。

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field

问题出在Schemaclass,你存储在GenericSerializer的字段中。

你可以试试这个:

public class GenericSerializer implements KafkaSerializationSchema<Tuple2<GenericRecord,GenericRecord>>{

    private final SerializationSchema<GenericRecord> valueDeserializer;
    private final SerializationSchema<GenericRecord> keyDeserializer;

    public GenericSerializer(String topic, Schema schemaK, Schema schemaV, String url) {
        this.keyDeserializer = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-key", schemaKey, registryUrl);
        this.valueDeserializer = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-value", schemaValue, registryUrl); 
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(Tuple2<GenericRecord,GenericRecord> element, Long timestamp) {
        byte[] key = keySerializer.serialize(element.f0);
        byte[] value = valueSerializer.serialize(element.f1);

        return new ProducerRecord<byte[], byte[]>(topic, key, value);
    }

}

ConfluentRegistryAvroSerializationSchema 是可序列化的,因此您可以安全地将其存储在 GenericSerializer 的字段中。

它的性能也会更高,因为不会为每个传入记录重新实例化底层结构。

关于 Flink 回退到 kryo for avro generic record 的问题有什么结论吗?

我正在使用 scala 并添加了这样的类型信息:

implicit val typeInformation: TypeInformation[GenericRecord] = TypeInformation.of( new TypeHint[GenericRecord] {
      new GenericRecordAvroTypeInfo(EventMessage.SCHEMA$)
    })

流设置如下:

DataStream[GenericRecord]

但 Flink Runtime 仍然退回到 kryo,因为它无法识别 Avro 通用记录并将其视为任何通用类型。

问题出在 org.apache.avro.Schema$Field class 上。 class 不可序列化,导致此异常。在 flink 文档中以及注释部分

下提到了解决方案

Since Avro’s Schema class is not serializable, it can not be sent around as is. You can work around this by converting it to a String and parsing it back when needed. If you only do this once on initialization, there is practically no difference to sending it directly.

所以我们需要对收到的每条消息进行解析,我们不能只在构造函数中解析一次,这与传递给构造函数本身是一样的。

所以解决方案可以像下面的代码片段。我们将在构造函数中接受 avro 模式作为字符串,并将在序列化方法中创建 avro 模式。

class AvroMessageSerializationSchema(topic: String, schemaString: String, schemaRegistry: String) extends KafkaSerializationSchema[GenericRecord] {

  private def getSchema(schema: String): Schema = {
    new Schema.Parser().parse(schema)
  }

  override def serialize(element: GenericRecord, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
    val schema = getSchema(schemaString)
    val value = ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, schemaRegistry).serialize(element)
    new ProducerRecord[Array[Byte], Array[Byte]](topic, value)
  }
}

我们需要记住的另一件事是提供 flink 序列化 avro 所需的类型信息,否则它将回退到 kyro 进行序列化。

implicit val typeInformation: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(avroSchema)

avro serialization with flink