使用 kryo 序列化 protobuf 的问题

Issue in serializing protobuf using kryo

我在 Java 中实现了自己的序列化程序。让我们称之为 abcSerializer。我尝试序列化的对象是 abc,它是一个 Google Protocol Buffer class.

我正在尝试使用 kryo 框架来序列化此对象。经过一些研究和阅读 google,我决定继续使用 kryo 序列化程序。我没有指定任何序列化器本身,所以我假设 kryo 选择了一个默认的序列化器。

public class abcSerializer implements AttributeSerializer <abc> {

  public abcSerializer() {
      kryo = new Kryo();
  }

  public static Kryo getKryo() {
      return kryo;
  }

  @Override
  public abc read(byte[] buffer) {

    abc xyz = null;

    ByteArrayInputStream abcsStream = new ByteArrayInputStream(buffer);
    Input abcsStreamInput = new Input(abcsStream);
    xyz = getKryo().readObject(abcsStreamInput, abc.class);
    return xyz;
}

@Override
public void write(byte[] buffer, abc abc) {

   ByteArrayOutputStream abcStream = new ByteArrayOutputStream();
   Output abcOutput = new Output(abcStream);

   getKryo().writeObject(abcOutput, abc);

   abcOutput.toBytes()        
}

}

当我写对象时,一切都很好。但是,当我执行 readObject 时,问题就来了。 Kyro 抛出以下异常。

com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): java.util.Collections$Unmodifiabljava.lang.IllegalStateException: Encountered error in deserializer [null value returned]. Check serializer compatibility.eRandomAccessList

上面的异常几乎是不言自明的。

kryo 文档如下。 ""Serializers for a specific type use Java code to create a new instance of that type. Serializers such as FieldSerializer are generic and must handle creating a new instance of any class. By default, if a class has a zero argument constructor then it is invoked via ReflectASM or reflection, otherwise an exception is thrown. If the zero argument constructor is private, an attempt is made to access it via reflection using setAccessible. If this is acceptable, a private zero argument constructor is a good way to allow Kryo to create instances of a class without affecting the public API.""

现在,我有两个问题。

1) google 生成的协议缓冲区 class 确实有一个无参数构造函数。然而,这似乎是私人的。这是上述 kryo 异常的问题和根本原因吗?

2) 如果是这样,如何在上述问题上取得进展?我的意思是,我如何编写自己的序列化程序并仍然用于序列化 google 协议缓冲区对象数据?

你觉得这个工作怎么样?

/**
 * This lets Kryo serialize protobufs more efficiently.
 */
public class ProtobufKryo<P extends GeneratedMessage> extends Serializer<P> {
    protected final Method parser;

    public ProtobufKryo(Class<P> theClass) {
        try {
            parser = theClass.getDeclaredMethod("parseFrom", InputStream.class);
            parser.setAccessible(true);
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException(theClass.toString() + " doesn't have parser");
        }
    }

    @Override
    public void write(Kryo kryo, Output output, P generatedMessage) {
        try {
            generatedMessage.writeTo(output);
        } catch (IOException e) {
            // This isn't supposed to happen with a Kryo output.
            throw new RuntimeException(e);
        }
    }

    @Override
    public P read(Kryo kryo, Input input, Class<P> gmClass) {
        try {
            return (P)parser.invoke(null, input);
        } catch (InvocationTargetException | IllegalAccessException e) {
            // These really shouldn't happen
            throw new IllegalArgumentException(e);
        }
    }
}

好的,一些解释....

当 Kryo 遇到它无法识别的 class 对象时,它会回退到 Java 序列化。不是每一个都高效,有时候还不行。

(好吧,我承认,以上可能并不总是正确的。它可能是 Kryo 配置的一部分。在我工作的环境中正确的。)

您可以告诉它为特定的 classes 使用自己的序列化,但有时您可以通过为特定的 classes 创建自定义序列化程序来做得更好。

以上利用了 protobufs 在 Kryo 中的现有序列化。基本上,它使用现有的 protobuf writeTo()parseFrom() 来处理 Kryo 中的序列化。您将注册上述 class 以序列化每个 protobuf classes。 (Protobuf class 扩展 GeneratedMessage。)

写出对象只是使用普通的protobuf writeTo()方法。在 protobuf 中回读使用 classes parseFrom() 方法,该方法是通过构造函数中的反射找到的。

因此,您可以使用以下内容配置序列化程序:

  Kryo k = new Kryo();
  k.addDefaultSerializer(MyProtobuf.class, ProtobufKryo.class);
  k.addDefaultSerializer(MyOtherProtobuf.class, ProtobufKryo.class);

等等

我发现 DaveWill's previous 有两个小问题:

  1. 使用 InputStream 在方法 read 中抛出异常。
  2. 有时在使用 Protobuf 生成具有许多嵌套 classes/messages.
  3. 的 class 时编写 class 名称很麻烦

所以我在下面的代码中做了一些小的改进来解决这个问题:

  • byte[] 数组替换了 InputStream
  • 添加了一个帮助程序 registerMessagesFrom 用于查找生成的顶级 Protobuf 中的所有嵌套 classes/messages classe.
package com.juarezr.serialization;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.protobuf.AbstractMessage;

import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

public class ProtobufSerializer<T extends AbstractMessage> extends Serializer<T> implements Serializable {
    
    static final long serialVersionUID = 1667386898559074449L;
    protected final Method parser;

    public ProtobufSerializer(final Class<T> protoMessageClass) {
        try {
            this.parser = protoMessageClass.getDeclaredMethod("parseFrom", byte[].class);
            this.parser.setAccessible(true);
        } catch (SecurityException | NoSuchMethodException ex) {
            throw new IllegalArgumentException(protoMessageClass.toString() + " doesn't have a protobuf parser", ex);
        }
    }

    @Override
    public void write(final Kryo kryo, final Output output, final T protobufMessage) {
        if (protobufMessage == null) {
            output.writeByte(Kryo.NULL);
            output.flush();
            return;
        }
        final byte[] bytes = protobufMessage.toByteArray();
        output.writeInt(bytes.length + 1, true);
        output.writeBytes(bytes);
        output.flush();
    }

    @SuppressWarnings({"unchecked", "JavaReflectionInvocation"})
    @Override
    public T read(final Kryo kryo, final Input input, final Class<T> protoMessageClass) {
        final int length = input.readInt(true);
        if (length == Kryo.NULL) {
            return null;
        }
        final Object bytesRead = input.readBytes(length - 1);
        try {
            final Object parsed = this.parser.invoke(protoMessageClass, bytesRead);
            return (T) parsed;
        } catch (IllegalAccessException | InvocationTargetException e) {
            throw new RuntimeException("Unable to deserialize protobuf for class: " + protoMessageClass.getName(), e);
        }
    }

    @Override
    public boolean getAcceptsNull() {
        return true;
    }

    @SuppressWarnings("unchecked")
    public static <M extends AbstractMessage> void registerMessagesFrom(final M rootMessage, final Kryo kryo) {

        final Class<M> messageClass = (Class<M>) rootMessage.getClass();
        final ProtobufSerializer<M> serializer = new ProtobufSerializer<>(messageClass);
        kryo.register(messageClass, serializer);

        final Class<?>[] nestedClasses = messageClass.getDeclaredClasses();
        for (final Class<?> innerClass : nestedClasses) {
            if ((AbstractMessage.class).isAssignableFrom(innerClass)) {
                final Class<M> typedClass = (Class<M>) innerClass;
                final ProtobufSerializer<M> serializer2 = new ProtobufSerializer<>(typedClass);
                kryo.register(typedClass, serializer2);
            }
        }
    }
}

您可以使用类似以下内容配置序列化:

// ...
final Kryo kryo = new Kryo();

ProtobufSerializer.registerMessagesFrom(MyProtoEnclosingClass.MyProtoTopLevelClass.getDefaultInstance(), kryo);

// Add a registration for each generated file and top level class ...

https://github.com/twitter/chill by twitter 是一组 kryo 序列化器,其中还包括一个用于 google 协议缓冲区的序列化器:ProtobufSerializer。