在 SSL / TLS 侦听器上读取 Kafka 运行 上的 BlockingChannel 时出现 EOF 异常

Getting EOF exception when reading BlockingChannel on Kafka running on SSL / TLS Listener

当 Kafka 代理正在使用端口 9093 侦听 SSL 时,我的一段代码抛出异常 EOF,在明文侦听器中,代码片段工作正常。

知道这里可能有什么问题吗??

     public KafkaMetadataHelper(String kafkaConnect) throws Exception {
    // use lowlevel kafka.api to query consumer group metadata (ie max committed offset)
    String[] hostAndPort = kafkaConnect.split(":");
    String host = hostAndPort[0];
    int port = Integer.parseInt(hostAndPort[1]);
    channel = new BlockingChannel(host, port,
                                  BlockingChannel.UseDefaultBufferSize(),
                                  BlockingChannel.UseDefaultBufferSize(),
                                  10000);
    channel.connect();
    GroupCoordinatorRequest request = new GroupCoordinatorRequest(MY_GROUP,
                                                                  GroupCoordinatorRequest.CurrentVersion(),
                                                                  correlationId++,
                                                                  MY_CLIENTID);
    channel.send(request);
    GroupCoordinatorResponse metadataResponse = null;
    try {
         metadataResponse = GroupCoordinatorResponse.readFrom(channel.receive().payload());// This is where the exception is thrown 

    } catch (Exception e) {
        e.printStackTrace();
    }

}

我收到的错误消息是这样的。

    java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:103)
    at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)

为了通过 TLS 连接,您的客户端需要一些设置! BlockingChannel不允许调用者指定任何设置。

我建议您查看 ConsumerGroupCommand.scala [1] 并了解它如何使用 AdminClient [2] 检索有关消费者组的详细信息。

  1. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala#L496
  2. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminClient.scala#L197