AVRO 模式更新的问题
Troubles with AVRO schema update
我有一个简单的案例class:
case class User(id: String, login: String, key: String)
我正在添加字段"name"
case class User(id: String, login: String, name: String, key: String)
然后在 avro 架构中添加此字段 (user.avsc)
{
"namespace": "test",
"type": "record",
"name": "User",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "login", "type": "string" },
{ "name": "name", "type": "string" },
{ "name": "key", "type": "string" }
]
}
这个class用于其他情况class:
case class AuthRequest(user: User, session: String)
架构 (auth_request.avsc)
{
"namespace": "test",
"type": "record",
"name": "AuthRequest",
"fields": [
{ "name": "user", "type": "User" },
{ "name": "session", "type": "string" }
]
}
更改后我的消费者开始抛出异常
Consumer.committableSource(consumerSettings, Subscriptions.topics("token_service_auth_request"))
.map { msg =>
Try {
val in: ByteArrayInputStream = new ByteArrayInputStream(msg.record.value())
val input: AvroBinaryInputStream[AuthRequest] = AvroInputStream.binary[AuthRequest](in)
val result: AuthRequest = input.iterator.toSeq.head !!!! here is exception
msg.committableOffset.commitScaladsl()
(msg.record.value(), result, msg.record.key())
} match {
case Success((a: Array[Byte], value: AuthRequest, key: String)) =>
log.info(s"listener got $msg -> $a -> $value")
context.parent ! value
case Failure(e) => e.printStackTrace()
}
}
.runWith(Sink.ignore)
java.util.NoSuchElementException: head of empty stream at
scala.collection.immutable.Stream$Empty$.head(Stream.scala:1104) at
scala.collection.immutable.Stream$Empty$.head(Stream.scala:1102) at
test.consumers.AuthRequestListener.$anonfun$new(AuthRequestListener.scala:39)
at scala.util.Try$.apply(Try.scala:209) at
test.consumers.AuthRequestListener.$anonfun$new(AuthRequestListener.scala:36)
at
test.consumers.AuthRequestListener.$anonfun$new$adapted(AuthRequestListener.scala:35)
at akka.stream.impl.fusing.Map$$anon.onPush(Ops.scala:51) at
akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:519)
at
akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:482)
at
akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:378)
at
akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:588)
at
akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:472)
at
akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:563)
at
akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:745)
at
akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive.applyOrElse(ActorGraphInterpreter.scala:760)
at akka.actor.Actor.aroundReceive(Actor.scala:517) at
akka.actor.Actor.aroundReceive$(Actor.scala:515) at
akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:670)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588) at
akka.actor.ActorCell.invoke(ActorCell.scala:557) at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
akka.dispatch.Mailbox.run(Mailbox.scala:225) at
akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
我尝试清理构建并使缓存无效 - 我似乎在某些地方缓存了以前版本的架构
请帮忙!
您需要使更改向后兼容,使新字段可为空并为其添加默认值。
{
"namespace": "test",
"type": "record",
"name": "User",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "login", "type": "string" },
{ "name": "name", "type": ["null", "string"], "default": null },
{ "name": "key", "type": "string" }
]
}
我有一个简单的案例class:
case class User(id: String, login: String, key: String)
我正在添加字段"name"
case class User(id: String, login: String, name: String, key: String)
然后在 avro 架构中添加此字段 (user.avsc)
{
"namespace": "test",
"type": "record",
"name": "User",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "login", "type": "string" },
{ "name": "name", "type": "string" },
{ "name": "key", "type": "string" }
]
}
这个class用于其他情况class:
case class AuthRequest(user: User, session: String)
架构 (auth_request.avsc)
{
"namespace": "test",
"type": "record",
"name": "AuthRequest",
"fields": [
{ "name": "user", "type": "User" },
{ "name": "session", "type": "string" }
]
}
更改后我的消费者开始抛出异常
Consumer.committableSource(consumerSettings, Subscriptions.topics("token_service_auth_request"))
.map { msg =>
Try {
val in: ByteArrayInputStream = new ByteArrayInputStream(msg.record.value())
val input: AvroBinaryInputStream[AuthRequest] = AvroInputStream.binary[AuthRequest](in)
val result: AuthRequest = input.iterator.toSeq.head !!!! here is exception
msg.committableOffset.commitScaladsl()
(msg.record.value(), result, msg.record.key())
} match {
case Success((a: Array[Byte], value: AuthRequest, key: String)) =>
log.info(s"listener got $msg -> $a -> $value")
context.parent ! value
case Failure(e) => e.printStackTrace()
}
}
.runWith(Sink.ignore)
java.util.NoSuchElementException: head of empty stream at scala.collection.immutable.Stream$Empty$.head(Stream.scala:1104) at scala.collection.immutable.Stream$Empty$.head(Stream.scala:1102) at test.consumers.AuthRequestListener.$anonfun$new(AuthRequestListener.scala:39) at scala.util.Try$.apply(Try.scala:209) at test.consumers.AuthRequestListener.$anonfun$new(AuthRequestListener.scala:36) at test.consumers.AuthRequestListener.$anonfun$new$adapted(AuthRequestListener.scala:35) at akka.stream.impl.fusing.Map$$anon.onPush(Ops.scala:51) at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:519) at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:482) at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:378) at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:588) at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:472) at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:563) at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:745) at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive.applyOrElse(ActorGraphInterpreter.scala:760) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:670) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588) at akka.actor.ActorCell.invoke(ActorCell.scala:557) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
我尝试清理构建并使缓存无效 - 我似乎在某些地方缓存了以前版本的架构 请帮忙!
您需要使更改向后兼容,使新字段可为空并为其添加默认值。
{
"namespace": "test",
"type": "record",
"name": "User",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "login", "type": "string" },
{ "name": "name", "type": ["null", "string"], "default": null },
{ "name": "key", "type": "string" }
]
}