在 SSL / TLS 侦听器上读取 Kafka 运行 上的 BlockingChannel 时出现 EOF 异常
Getting EOF exception when reading BlockingChannel on Kafka running on SSL / TLS Listener
当 Kafka 代理正在使用端口 9093 侦听 SSL 时,我的一段代码抛出异常 EOF,在明文侦听器中,代码片段工作正常。
知道这里可能有什么问题吗??
public KafkaMetadataHelper(String kafkaConnect) throws Exception {
// use lowlevel kafka.api to query consumer group metadata (ie max committed offset)
String[] hostAndPort = kafkaConnect.split(":");
String host = hostAndPort[0];
int port = Integer.parseInt(hostAndPort[1]);
channel = new BlockingChannel(host, port,
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
10000);
channel.connect();
GroupCoordinatorRequest request = new GroupCoordinatorRequest(MY_GROUP,
GroupCoordinatorRequest.CurrentVersion(),
correlationId++,
MY_CLIENTID);
channel.send(request);
GroupCoordinatorResponse metadataResponse = null;
try {
metadataResponse = GroupCoordinatorResponse.readFrom(channel.receive().payload());// This is where the exception is thrown
} catch (Exception e) {
e.printStackTrace();
}
}
我收到的错误消息是这样的。
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:103)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)
为了通过 TLS 连接,您的客户端需要一些设置! BlockingChannel
不允许调用者指定任何设置。
我建议您查看 ConsumerGroupCommand.scala
[1] 并了解它如何使用 AdminClient
[2] 检索有关消费者组的详细信息。
当 Kafka 代理正在使用端口 9093 侦听 SSL 时,我的一段代码抛出异常 EOF,在明文侦听器中,代码片段工作正常。
知道这里可能有什么问题吗??
public KafkaMetadataHelper(String kafkaConnect) throws Exception {
// use lowlevel kafka.api to query consumer group metadata (ie max committed offset)
String[] hostAndPort = kafkaConnect.split(":");
String host = hostAndPort[0];
int port = Integer.parseInt(hostAndPort[1]);
channel = new BlockingChannel(host, port,
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
10000);
channel.connect();
GroupCoordinatorRequest request = new GroupCoordinatorRequest(MY_GROUP,
GroupCoordinatorRequest.CurrentVersion(),
correlationId++,
MY_CLIENTID);
channel.send(request);
GroupCoordinatorResponse metadataResponse = null;
try {
metadataResponse = GroupCoordinatorResponse.readFrom(channel.receive().payload());// This is where the exception is thrown
} catch (Exception e) {
e.printStackTrace();
}
}
我收到的错误消息是这样的。
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:103)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)
为了通过 TLS 连接,您的客户端需要一些设置! BlockingChannel
不允许调用者指定任何设置。
我建议您查看 ConsumerGroupCommand.scala
[1] 并了解它如何使用 AdminClient
[2] 检索有关消费者组的详细信息。