Kafka Broker 的问题 - UnknownServerException
Issue with Kafka Broker - UnknownServerException
我们的应用程序使用 springBootVersion = 2.0.4.RELEASE
以及 compile('io.projectreactor.kafka:reactor-kafka:1.0.1.RELEASE')
依赖项。
我们的 Kafka Broker 版本为 1.0.1
。
当我们通过创建 reactor.kafka.sender.SenderRecord
将消息发送到 Kafka 时,间歇性地在查找 reactor.kafka.sender.SenderResult.exception()
时响应 Kafka,我们有
java.lang.RuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
填充异常。
重试几次后,消息成功通过。
在代理日志中多次打印以下错误,但没有任何堆栈跟踪
[2019-02-08 15:43:07,501] ERROR [ReplicaManager broker=3] Error processing append operation on partition price-promotions-local-event-0 (kafka.server.ReplicaManager)
其中 price-promotions-local-event
是我们的主题。
我在网上看过,但没有明确的解决方案或方法来分类这个问题,非常感谢您的帮助。
在进一步调查中,我们可以在代理日志中获得堆栈跟踪
ERROR [ReplicaManager broker=1] Error processing append operation on partition price-promotions-local-event-0 (kafka.server.ReplicaManager)
java.lang.IllegalArgumentException: Magic v1 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403)
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:442)
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:595)
at kafka.log.LogValidator$.$anonfun$convertAndAssignOffsetsNonCompressed(LogValidator.scala:138)
at kafka.log.LogValidator$.$anonfun$convertAndAssignOffsetsNonCompressed$adapted(LogValidator.scala:136)
at scala.collection.Iterator.foreach(Iterator.scala:929)
at scala.collection.Iterator.foreach$(Iterator.scala:929)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
at scala.collection.IterableLike.foreach(IterableLike.scala:71)
at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.log.LogValidator$.$anonfun$convertAndAssignOffsetsNonCompressed(LogValidator.scala:136)
at kafka.log.LogValidator$.$anonfun$convertAndAssignOffsetsNonCompressed$adapted(LogValidator.scala:133)
at scala.collection.Iterator.foreach(Iterator.scala:929)
at scala.collection.Iterator.foreach$(Iterator.scala:929)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
at scala.collection.IterableLike.foreach(IterableLike.scala:71)
at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.log.LogValidator$.convertAndAssignOffsetsNonCompressed(LogValidator.scala:133)
at kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:64)
at kafka.log.Log.liftedTree1(Log.scala:654)
at kafka.log.Log.$anonfun$append(Log.scala:642)
at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
at kafka.log.Log.append(Log.scala:624)
at kafka.log.Log.appendAsLeader(Log.scala:597)
at kafka.cluster.Partition.$anonfun$appendRecordsToLeader(Partition.scala:499)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:223)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:487)
at kafka.server.ReplicaManager.$anonfun$appendToLocalLog(ReplicaManager.scala:724)
at scala.collection.TraversableLike.$anonfun$map(TraversableLike.scala:234)
at scala.collection.mutable.HashMap.$anonfun$foreach(HashMap.scala:138)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:229)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:138)
at scala.collection.TraversableLike.map(TraversableLike.scala:234)
at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:708)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:459)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:465)
at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
at java.lang.Thread.run(Thread.java:748)
从 org.apache.kafka:kafka-clients:1.0.2
中可用的 class 文件 MemoryRecordsBuilder
中,我们有下面的 IllegalArgumentException
被抛出的地方。
if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
所以,在导致问题的 ProducerRecord
中设置了 headers,在打印 ProducerRecord
时我们发现 headers 是由 AppDynamics 添加的-- 一个“singularityheader”被添加到 Kafka Produced 记录中。
c.t.a.p.i.m.i.KafkaProducerInterceptor : The kafka Interceptor ProducerRecord header:: RecordHeader(key = singularityheader, value = [110, 111, 116, 120, 100, 101, 116, 101, 99, 116, 61, 116, 114, 117, 101, 42, 99, 116, 114, 108, 103, 117, 105, 100, 61, 49, 53, 53, 49, 51, 55, 51, 54, 57, 49, 42, 97, 112, 112, 73, 100, 61, 55, 49, 48, 51, 50, 42, 110, 111, 100, 101, 105, 100, 61, 49, 51, 53, 55, 53, 51, 53])
所以我们在拦截器中明确地将 headers 设置为 null,这解决了问题。
我们的应用程序使用 springBootVersion = 2.0.4.RELEASE
以及 compile('io.projectreactor.kafka:reactor-kafka:1.0.1.RELEASE')
依赖项。
我们的 Kafka Broker 版本为 1.0.1
。
当我们通过创建 reactor.kafka.sender.SenderRecord
将消息发送到 Kafka 时,间歇性地在查找 reactor.kafka.sender.SenderResult.exception()
时响应 Kafka,我们有
java.lang.RuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
填充异常。
重试几次后,消息成功通过。
在代理日志中多次打印以下错误,但没有任何堆栈跟踪
[2019-02-08 15:43:07,501] ERROR [ReplicaManager broker=3] Error processing append operation on partition price-promotions-local-event-0 (kafka.server.ReplicaManager)
其中 price-promotions-local-event
是我们的主题。
我在网上看过,但没有明确的解决方案或方法来分类这个问题,非常感谢您的帮助。
在进一步调查中,我们可以在代理日志中获得堆栈跟踪
ERROR [ReplicaManager broker=1] Error processing append operation on partition price-promotions-local-event-0 (kafka.server.ReplicaManager)
java.lang.IllegalArgumentException: Magic v1 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403)
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:442)
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:595)
at kafka.log.LogValidator$.$anonfun$convertAndAssignOffsetsNonCompressed(LogValidator.scala:138)
at kafka.log.LogValidator$.$anonfun$convertAndAssignOffsetsNonCompressed$adapted(LogValidator.scala:136)
at scala.collection.Iterator.foreach(Iterator.scala:929)
at scala.collection.Iterator.foreach$(Iterator.scala:929)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
at scala.collection.IterableLike.foreach(IterableLike.scala:71)
at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.log.LogValidator$.$anonfun$convertAndAssignOffsetsNonCompressed(LogValidator.scala:136)
at kafka.log.LogValidator$.$anonfun$convertAndAssignOffsetsNonCompressed$adapted(LogValidator.scala:133)
at scala.collection.Iterator.foreach(Iterator.scala:929)
at scala.collection.Iterator.foreach$(Iterator.scala:929)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
at scala.collection.IterableLike.foreach(IterableLike.scala:71)
at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.log.LogValidator$.convertAndAssignOffsetsNonCompressed(LogValidator.scala:133)
at kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:64)
at kafka.log.Log.liftedTree1(Log.scala:654)
at kafka.log.Log.$anonfun$append(Log.scala:642)
at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
at kafka.log.Log.append(Log.scala:624)
at kafka.log.Log.appendAsLeader(Log.scala:597)
at kafka.cluster.Partition.$anonfun$appendRecordsToLeader(Partition.scala:499)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:223)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:487)
at kafka.server.ReplicaManager.$anonfun$appendToLocalLog(ReplicaManager.scala:724)
at scala.collection.TraversableLike.$anonfun$map(TraversableLike.scala:234)
at scala.collection.mutable.HashMap.$anonfun$foreach(HashMap.scala:138)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:229)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:138)
at scala.collection.TraversableLike.map(TraversableLike.scala:234)
at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:708)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:459)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:465)
at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
at java.lang.Thread.run(Thread.java:748)
从 org.apache.kafka:kafka-clients:1.0.2
中可用的 class 文件 MemoryRecordsBuilder
中,我们有下面的 IllegalArgumentException
被抛出的地方。
if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
所以,在导致问题的 ProducerRecord
中设置了 headers,在打印 ProducerRecord
时我们发现 headers 是由 AppDynamics 添加的-- 一个“singularityheader”被添加到 Kafka Produced 记录中。
c.t.a.p.i.m.i.KafkaProducerInterceptor : The kafka Interceptor ProducerRecord header:: RecordHeader(key = singularityheader, value = [110, 111, 116, 120, 100, 101, 116, 101, 99, 116, 61, 116, 114, 117, 101, 42, 99, 116, 114, 108, 103, 117, 105, 100, 61, 49, 53, 53, 49, 51, 55, 51, 54, 57, 49, 42, 97, 112, 112, 73, 100, 61, 55, 49, 48, 51, 50, 42, 110, 111, 100, 101, 105, 100, 61, 49, 51, 53, 55, 53, 51, 53])
所以我们在拦截器中明确地将 headers 设置为 null,这解决了问题。