Kryo:反序列化 class 的旧版本

Kryo: deserialize old version of class

我需要通过添加两个新参数来修改 class。这个class是用Kryo连载的。 每次我停止直播时,我目前都会将与此 class 相关的信息保存为 RDD。 当我重新启动流时,我加载了我之前保留的信息,并使用它们在我停止和重新启动之间保持一致性。

由于 class 我坚持需要这些新参数,我更改了 class 和序列化器,为新的 kryo.writeObject(output, object, ObjectSerializer)kryo.readObject(input, classOf[Object], ObjectSerializer) 添加了新的参数.

现在,每当我重新启动我的流时,我都会收到一个异常:“遇到未注册的 class ...”。

这似乎很明显,因为我尝试反序列化一个对象,该对象不包含在我停止流时保留的信息中。 如果我删除这些数据并启动流,就好像它之前没有任何数据一样 运行,则不会发生异常。

有没有办法避免这个异常? 也许通过指定一些默认值以防这些参数丢失?

谢谢

编辑:

我发现了一些我以前没见过的有用的东西: Kryo issue 194.

这家伙通过简单地插入一个长定义他应该使用哪个版本的反序列化器来实现版本控制。 这是一个简单的解决方案,但是,由于编写我正在处理的代码的公司没有考虑向前兼容性,我想我将不得不丢弃 window 在新的之前保留的所有数据序列化器。

如果有人能提出更好的解决方案,请告诉我。

编辑 2:

这种情况仍然存在问题。 我尝试按照此处所述使用 CompatibleFieldSerializer :CompatibleFieldSerializer Example 因此,通过注册此序列化程序而不是之前使用的自定义序列化程序。 结果是现在,当重新加载持久数据时,它会给出 java.lang.NullPointerException。 如果以前的数据没有保留,仍然没有问题。我可以启动流、序列化新数据、停止流、反序列化并重新启动流。 仍然没有关于决议的线索。

问题的解决方案是在几个月前找到的。所以我想尽快 post 回答这个问题。 问题在于,由于代码中的错误,class 是使用标准的 Kryo FieldSerializer 序列化的,它不向前兼容。 我们必须执行以下操作来反序列化旧的 class 并将其转换为新的序列化 class.

情况是:

case class ClassA(field1 : Long, field2 : String)

它是这样连载的:

object ClassASerializer extends Serializer[ClassA] with Serializable{
  override def write(kryo: Kryo, output: Output, t: ClassA) = {
      output.writeLong    { t.field1 }
      output.writeString  { t.field2 }
 }
  override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
       classA( 
           field1 = input.readLong(),
           field2 = input.readLong()
       )

并且循环包含要使用序列化器序列化的 classes 的 Seq,以便为所有 classes 注册所有序列化器。

    protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ...
    final def register(kryo: Kryo) = {
         registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) }
    }

class需要修改,增加一个新的字段,是另一个案例class的实例。

为了执行此类更改,我们必须使用与 Kryo 库相关的注释 "optional"、

...
import com.esotericsoftware.kryo.serializers.FieldSerializer.Optional
import scala.annotation.meta.field
...

case class ClassA(field1 : Long, field2 : String,  @(Optional @field)("field3") field3 : ClassB)

序列化程序已修改,例如在读取旧的序列化时 class 它可以使用默认值实例化 field3,并且在写入时写入这样的默认值:

object ClassASerializer extends Serializer[ClassA] with Serializable{
  override def write(kryo: Kryo, output: Output, t: ClassA) = {
      output.writeLong    { t.field1 }
      output.writeString  { t.field2 }
      kryo.writeObject(output, Option { t.field3 } getOrElse ClassB.default, ClassBSerializer)

 }
  override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
       ClassA( 
           field1 = input.readLong(),
           field2 = input.readLong(),
           field3 = ClassB.default
       )

还修改了 kryo 序列化器注册以注册可选字段:

    protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ...
    def optionals = Seq("field3")

    final def register(kryo: Kryo) = {
        optionals.foreach { optional =>
        kryo.getContext.asInstanceOf[ObjectMap[Any, Any]].put(optional, true) }
        registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) }
    }

因此,我们能够编写序列化的新版本 class。 在那之后,我们不得不删除可选的注解,修改序列化器以便从新的序列化中读取真实字段class,并删除可选的序列化器注册并将其添加到注册表 Seq.

同时我们更正了代码中通过FieldSerializer强制序列化的错误,但这不在问题范围内。