kafkaserver 关闭在单元测试中无限期挂起(kafka 从 0.8.1.1 升级到 0.8.2.0)

kafkaserver shutdown hangs indefinitely on unit tests (kafka upgrade from 0.8.1.1 to 0.8.2.0)

我们最近从 kafka 0.8.1.1 升级到 0.8.2.0。我们的集成测试失败了,因为测试无限期地挂在 kafkaServer.shutdown()

这些是我的经纪人设置

Properties brokerProps = new Properties();
brokerProps.put("zookeeper.connect", "127.0.0.1:8888");
brokerProps.put("port", "9092");
brokerProps.setProperty("num.partitions", "10");
brokerProps.setProperty("broker.id", "1");
brokerProps.setProperty("log.dirs", "some log dir");
brokerProps.setProperty("advertised.host.name", "127.0.0.1");
KafkaConfig config = new KafkaConfig(brokerProps);

这是我看到的堆栈跟踪

 ERROR [kafka-network-thread-9092-0] kafka.network.Processor - Closing socket for /127.0.0.1 because of error
java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.8.0_25]
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) ~[na:1.8.0_25]
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[na:1.8.0_25]
    at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.8.0_25]
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470) ~[na:1.8.0_25]
    at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:68) ~[kafka_2.10-0.8.2.0.jar:na]
    at kafka.network.MultiSend.writeTo(Transmission.scala:101) ~[kafka_2.10-0.8.2.0.jar:na]
    at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125) ~[kafka_2.10-0.8.2.0.jar:na]
    at kafka.network.MultiSend.writeTo(Transmission.scala:101) ~[kafka_2.10-0.8.2.0.jar:na]
    at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231) ~[kafka_2.10-0.8.2.0.jar:na]
    at kafka.network.Processor.write(SocketServer.scala:472) ~[kafka_2.10-0.8.2.0.jar:na]
    at kafka.network.Processor.run(SocketServer.scala:342) ~[kafka_2.10-0.8.2.0.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]

使用 jstack 创建线程转储并分析输出。查找阻塞的线程或可运行的线程,它们位于套接字或文件操作等本机调用中。然后检查有问题线程的源代码。

找到问题所在。我们在关闭 kafkaserver 之前关闭了 ZK。 KafkaServer 会无限期地等待以尝试与 zkClient 建立连接。 更改顺序有效。