从 Chill 0.6.0 (Kryo 2.21) 迁移到 0.9.5 (Kryo 4.0.2) 并反序列化旧消息
Migrating from Chill 0.6.0 (Kryo 2.21) to 0.9.5 (Kryo 4.0.2) and deserializing old messages
我们正在使用 Chill 双射 serializing/deserializing 消息与 Kryo 往来于 Kafka。我们应用程序的旧版本使用依赖于 com.esotericsoftware.kryo.kryo-2.21.jar
的 Chill 0.6.0,我们应用程序的新版本使用依赖于 com.esotericsoftware.kryo-shaded-4.0.2.jar
.
的 Chill 0.9.5
为了最大限度地减少停机时间,我们的新版本应用程序需要能够读取由旧版本应用程序编写的消息,但它失败并出现错误:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition prod_2x02_external_entity_updates-0 at offset 8764198. If needed, please seek past the record to continue consumption.
Caused by: com.twitter.bijection.InversionFailure: Failed to invert: [B@14122f45
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure.applyOrElse(InversionFailure.scala:43)
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure.applyOrElse(InversionFailure.scala:42)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at scala.util.Failure.recoverWith(Try.scala:236)
at com.twitter.bijection.Inversion$.attempt(Inversion.scala:32)
at com.X.backend.serialization.CustomKafkaKryoDeserializer.deserialize(KafkaKryoSerialization.scala:38)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310)
at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:128)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access00(Fetcher.java:1377)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1315)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at com.X.common.kafka.KafkaSubscriber$$anonfun$brokerFound.applyOrElse(KafkaSubscriber.scala:163)
at akka.actor.Actor.aroundReceive(Actor.scala:535)
at akka.actor.Actor.aroundReceive$(Actor.scala:533)
at com.X.common.kafka.KafkaSubscriber.aroundReceive(KafkaSubscriber.scala:29)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: com.X.backend.DashboardExternalEntities$ExtMessage
Serialization trace:
entity (com.X.backend.QueueMessageProtocol$ExternalEntityUpdated)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at com.twitter.chill.SerDeState.readClassAndObject(SerDeState.java:61)
at com.twitter.chill.KryoPool.fromBytes(KryoPool.java:94)
at com.X.backend.serialization.CustomKafkaKryoDeserializer.$anonfun$deserialize(KafkaKryoSerialization.scala:38)
at com.twitter.bijection.Inversion$.$anonfun$attempt(Inversion.scala:32)
at scala.util.Try$.apply(Try.scala:213)
... 25 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.X.backend.DashboardExternalEntities$ExtMessage
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
... 35 common frames omitted
基于此:https://github.com/EsotericSoftware/kryo/releases/tag/kryo-parent-4.0.0我实现了自定义ScalaKryoInstantiator
和相应的类添加setOptimizedGenerics(true)
:
class CustomKafkaKryoDeserializer[M <: AnyRef] extends KafkaDeserializer[M] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
override def deserialize(topic: String, data: Array[Byte]): M = {
Inversion.attempt(data) {
CustomKryoInstantiator.defaultPool.fromBytes(_)
}.get.asInstanceOf[M]
}
}
object CustomKryoInstantiator extends ScalaKryoInstantiator {
private val mutex = new AnyRef with Serializable // some serializable object
@transient private var kpool: Option[KryoPool] = None
def defaultPool: KryoPool = mutex.synchronized {
if (kpool.isEmpty) {
kpool = Some(KryoPool.withByteArrayOutputStream(guessThreads, new CustomKryoInstantiator))
}
kpool.get
}
private def guessThreads: Int = {
val cores = Runtime.getRuntime.availableProcessors
val GUESS_THREADS_PER_CORE = 4
GUESS_THREADS_PER_CORE * cores
}
}
class CustomKryoInstantiator extends EmptyScalaKryoInstantiator {
override def newKryo: KryoBase = {
val k = super.newKryo
k.getFieldSerializerConfig.setOptimizedGenerics(true)
val reg = new AllScalaRegistrar
reg(k)
k
}
}
但我仍然遇到同样的错误。有没有办法用 Kryo 4.0.2 读取由 Kryo 2.21 序列化的消息?消息类本身没有改变。
事实证明,消息包已重命名,因此 Kryo 无法找到正确的 类。
尽管如此,即使恢复了包重命名,Kryo 4.0.2 和 3.0.3 也无法反序列化使用 Kryo 2.21 序列化的消息。
总而言之,我们决定用 Protobuf 替换 Kryo 并编写 MirrorMakerMessageHandler
将 Kafka 消息从 Chill-bijection 0.6.0 (Kryo 2.21) 转换为 Protobuf。
我们正在使用 Chill 双射 serializing/deserializing 消息与 Kryo 往来于 Kafka。我们应用程序的旧版本使用依赖于 com.esotericsoftware.kryo.kryo-2.21.jar
的 Chill 0.6.0,我们应用程序的新版本使用依赖于 com.esotericsoftware.kryo-shaded-4.0.2.jar
.
为了最大限度地减少停机时间,我们的新版本应用程序需要能够读取由旧版本应用程序编写的消息,但它失败并出现错误:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition prod_2x02_external_entity_updates-0 at offset 8764198. If needed, please seek past the record to continue consumption.
Caused by: com.twitter.bijection.InversionFailure: Failed to invert: [B@14122f45
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure.applyOrElse(InversionFailure.scala:43)
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure.applyOrElse(InversionFailure.scala:42)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at scala.util.Failure.recoverWith(Try.scala:236)
at com.twitter.bijection.Inversion$.attempt(Inversion.scala:32)
at com.X.backend.serialization.CustomKafkaKryoDeserializer.deserialize(KafkaKryoSerialization.scala:38)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310)
at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:128)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access00(Fetcher.java:1377)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1315)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at com.X.common.kafka.KafkaSubscriber$$anonfun$brokerFound.applyOrElse(KafkaSubscriber.scala:163)
at akka.actor.Actor.aroundReceive(Actor.scala:535)
at akka.actor.Actor.aroundReceive$(Actor.scala:533)
at com.X.common.kafka.KafkaSubscriber.aroundReceive(KafkaSubscriber.scala:29)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: com.X.backend.DashboardExternalEntities$ExtMessage
Serialization trace:
entity (com.X.backend.QueueMessageProtocol$ExternalEntityUpdated)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at com.twitter.chill.SerDeState.readClassAndObject(SerDeState.java:61)
at com.twitter.chill.KryoPool.fromBytes(KryoPool.java:94)
at com.X.backend.serialization.CustomKafkaKryoDeserializer.$anonfun$deserialize(KafkaKryoSerialization.scala:38)
at com.twitter.bijection.Inversion$.$anonfun$attempt(Inversion.scala:32)
at scala.util.Try$.apply(Try.scala:213)
... 25 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.X.backend.DashboardExternalEntities$ExtMessage
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
... 35 common frames omitted
基于此:https://github.com/EsotericSoftware/kryo/releases/tag/kryo-parent-4.0.0我实现了自定义ScalaKryoInstantiator
和相应的类添加setOptimizedGenerics(true)
:
class CustomKafkaKryoDeserializer[M <: AnyRef] extends KafkaDeserializer[M] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
override def deserialize(topic: String, data: Array[Byte]): M = {
Inversion.attempt(data) {
CustomKryoInstantiator.defaultPool.fromBytes(_)
}.get.asInstanceOf[M]
}
}
object CustomKryoInstantiator extends ScalaKryoInstantiator {
private val mutex = new AnyRef with Serializable // some serializable object
@transient private var kpool: Option[KryoPool] = None
def defaultPool: KryoPool = mutex.synchronized {
if (kpool.isEmpty) {
kpool = Some(KryoPool.withByteArrayOutputStream(guessThreads, new CustomKryoInstantiator))
}
kpool.get
}
private def guessThreads: Int = {
val cores = Runtime.getRuntime.availableProcessors
val GUESS_THREADS_PER_CORE = 4
GUESS_THREADS_PER_CORE * cores
}
}
class CustomKryoInstantiator extends EmptyScalaKryoInstantiator {
override def newKryo: KryoBase = {
val k = super.newKryo
k.getFieldSerializerConfig.setOptimizedGenerics(true)
val reg = new AllScalaRegistrar
reg(k)
k
}
}
但我仍然遇到同样的错误。有没有办法用 Kryo 4.0.2 读取由 Kryo 2.21 序列化的消息?消息类本身没有改变。
事实证明,消息包已重命名,因此 Kryo 无法找到正确的 类。
尽管如此,即使恢复了包重命名,Kryo 4.0.2 和 3.0.3 也无法反序列化使用 Kryo 2.21 序列化的消息。
总而言之,我们决定用 Protobuf 替换 Kryo 并编写 MirrorMakerMessageHandler
将 Kafka 消息从 Chill-bijection 0.6.0 (Kryo 2.21) 转换为 Protobuf。