从 0.7 升级到 0.8.1.1 后生成嵌入式 kafka 队列时出错

Error producing to embedded kafka queue after upgrade from 0.7 to 0.8.1.1

我没能找到任何可以直接解决我面临的问题的东西,所以我在这里发帖。我有 JUnit/JBehave 测试启动嵌入式 ZooKeeper 服务器、嵌入式 Kafka 服务器以及 kafka 生产者和消费者。

将kafka从0.7升级到0.8.1.1后,遇到以下类型的错误:

ERROR [kafka-request-handler-5] state.change.logger - Error on broker 1 while processing LeaderAndIsr request correlationId 7 received from controller 1 epoch 1 for partition [topicName,8]
java.lang.NullPointerException: null
at kafka.log.Log.<init>(Log.scala:60) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.log.LogManager.createLog(LogManager.scala:265) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:90) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.cluster.Partition$$anonfun$makeLeader.apply(Partition.scala:175) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.cluster.Partition$$anonfun$makeLeader.apply(Partition.scala:175) ~[kafka_2.10-0.8.1.1.jar:na]
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) ~[scala-library-2.10.4.jar:na]
at kafka.cluster.Partition.makeLeader(Partition.scala:175) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.ReplicaManager$$anonfun$makeLeaders.apply(ReplicaManager.scala:305) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.ReplicaManager$$anonfun$makeLeaders.apply(ReplicaManager.scala:304) ~[kafka_2.10-0.8.1.1.jar:na]
at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:98) ~[scala-library-2.10.4.jar:na]
at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:98) ~[scala-library-2.10.4.jar:na]
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) ~[scala-library-2.10.4.jar:na]
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) ~[scala-library-2.10.4.jar:na]
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) ~[scala-library-2.10.4.jar:na]
at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:304) [kafka_2.10-0.8.1.1.jar:na]
at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:258) [kafka_2.10-0.8.1.1.jar:na]
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:217) [kafka_2.10-0.8.1.1.jar:na]
at kafka.server.KafkaApis.handle(KafkaApis.scala:189) [kafka_2.10-0.8.1.1.jar:na]
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) [kafka_2.10-0.8.1.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]

WARN  [threadName] k.c.ConsumerFetcherManager$LeaderFinderThread - [threadName], Failed to add leader for partitions [topicName,9],[topicName,3],[topicName,0],[topicName,8],[topicName,5],[topicName,1],[topicName,6],[topicName,2],[topicName,7],[topicName,4]; will retry
kafka.common.NotLeaderForPartitionException: null
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_25]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_25]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_25]
at java.lang.reflect.Constructor.newInstance(Constructor.java:408) ~[na:1.8.0_25]
at java.lang.Class.newInstance(Class.java:438) ~[na:1.8.0_25]
at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:160) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.AbstractFetcherThread$$anonfun$addPartitions.apply(AbstractFetcherThread.scala:179) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.AbstractFetcherThread$$anonfun$addPartitions.apply(AbstractFetcherThread.scala:174) ~[kafka_2.10-0.8.1.1.jar:na]
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach.apply(TraversableLike.scala:772) ~[scala-library-2.10.4.jar:na]
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) ~[scala-library-2.10.4.jar:na]
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) ~[scala-library-2.10.4.jar:na]
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) ~[scala-library-2.10.4.jar:na]
at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:174) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions.apply(AbstractFetcherManager.scala:86) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions.apply(AbstractFetcherManager.scala:76) ~[kafka_2.10-0.8.1.1.jar:na]
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach.apply(TraversableLike.scala:772) ~[scala-library-2.10.4.jar:na]
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[scala-library-2.10.4.jar:na]
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) ~[scala-library-2.10.4.jar:na]
at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [kafka_2.10-0.8.1.1.jar:na]

02/03 10:26:34.655 WARN  [kafka-request-handler-7] kafka.server.KafkaApis - [KafkaApi-1] Offset request with correlation id 0 from client clientName on partition [topicName,5] failed due to Leader not local for partition [topicName,5] on broker 1

原来这与新的KafkaServer构造函数中的Time参数有关。

我为 kafka.utils.Time 对象传递了一个空参数:

private KafkaServer server = new KafkaServer(config, null);

相反,您需要创建 kafka.utils.Time 接口的实现,并传入该接口的新实例:

private KafkaServer server = new KafkaServer(config, new SystemTime());

private static class SystemTime implements Time {

    @Override
    public long milliseconds() {
        return System.currentTimeMillis();
    }

    @Override
    public long nanoseconds() {
        return System.nanoTime();
    }

    @Override
    public void sleep(long arg0) {
        try {
            Thread.sleep(arg0);
        } catch (InterruptedException e) {
            log.error("Kafka systemtime interrupted",e);
        }
    }

}