Kafka Consumer throwing "OutOfMemoryError: Java heap space" Error on SSL
Kafka Consumer throwing "OutOfMemoryError: Java heap space" Error on SSL
我在 spring 引导项目中使用 Spring-Kafka 2.7.1。
当我将其连接到配置了 SSL 的 Kafka Broker 时,它会出现如下所示的“OutofMemory”错误,即使我多次增加堆大小也无济于事。
登录下方 :
java.lang.OutOfMemoryError: Java heap space\
at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61) ~[na:na]\
at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348) ~[na:na]\
at org.apache.kafka.common.memory.MemoryPool.tryAllocate(MemoryPool.java:30) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.Selector.poll(Selector.java:481) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:563) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[kafka-clients-2.7.1.jar!/:na]\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1414) ~[spring-kafka-2.7.7.jar!/:2.7.7]\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1251) ~[spring-kafka-2.7.7.jar!/:2.7.7]\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1163) ~[spring-kafka-2.7.7.jar!/:2.7.7]\
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]\
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]\
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]\
我当前的 YAML 配置如下:
spring:
kafka:
bootstrap-servers: KAFKA_BOOTSTRAP_SERVER
security:
protocol: "SSL"
consumer:
auto-offset-reset: earliest
producer:
topic: TOPIC
bootstrap-servers: KAFKA_BOOTSTRAP_SERVER
consumer:
topic: TOPIC
bootstrap-servers: KAFKA_BOOTSTRAP_SERVERS
它在连接到非 SSL Kafka Broker 时按预期工作。
我已经测试了所有其他可能性,并挑出它与客户端的 SSL 配置有关。
尝试以非安全方式Kafka 安全端点 时,可能会运行 出现内存不足错误。 (当使用错误的安全协议或未传递所需的身份验证属性时,这是一个已知问题;OOM 错误完全无关,但确实如此)
对于 Kafka CLI 命令,通常会随命令传递 属性 文件路径以提供与安全相关的属性。
例如:
kafka-topics --command-config <String: filename>
kafka-console-producer --producer.config <String: filename>
kafka-console-consumer --consumer.config <String: filename>
一般包含,
security.protocol=<kafka_security_protocol>
ssl.truststore.location=<ssl_truststore_filename>
ssl.truststore.password=<truststore_password>
ssl.keystore.location=<client_keystore.jks>
ssl.keystore.password=<password>
ssl.key.password=<password>
根据这个问题,我假设生产者和消费者组件都连接到相同的代理,并在 spring.kafka[= 下声明了连接到安全代理所需的所有属性42=] 以下示例中的部分。
spring:
kafka:
bootstrap-servers: KAFKA_BOOTSTRAP_SERVER
security:
protocol: "SSL"
ssl:
trust-store-location: "truststore.jks"
trust-store-password: "<password>"
key-store-location: "keystore.jks"
key-store-password: "<password>"
key-password: "<password>"
如果生产者和消费者连接到不同的代理,则应在 spring.kafka.producer 和 [= 下指定这些属性45=].consumer 部分分别.
spring:
kafka:
bootstrap-servers: KAFKA_BOOTSTRAP_SERVER
security:
protocol: "SSL"
producer:
topic: TOPIC
bootstrap-servers: KAFKA_BOOTSTRAP_SERVER
ssl.protocol: "SSL"
ssl.endpoint.identification.algorithm: "https"
ssl:
keystore-location: "<keystore.jks>"
keystore-password: "<password>"
consumer:
topic: TOPIC
auto-offset-reset: "earliest"
bootstrap-servers: KAFKA_BOOTSTRAP_SERVERS
ssl.protocol: "SSL"
ssl.endpoint.identification.algorithm: "https"
ssl:
keystore-location: "<keystore.jks>"
keystore-password: "<password>"
如果代理端不需要客户端身份验证,则以下是最小配置示例:
security.protocol=SSL
ssl.truststore.location=<kafka.client.truststore.jks>
ssl.truststore.password=<password>
如果需要客户端身份验证,还需要包含以下属性。
ssl.keystore.location=<kafka.client.keystore.jks>
ssl.keystore.password=<password>
ssl.key.password=<password>
请注意,属性 命名约定在 Spring Kafka 配置中可能有所不同。
有关 Kafka 安全性的更多详细信息 - Official Doc
我在 spring 引导项目中使用 Spring-Kafka 2.7.1。
当我将其连接到配置了 SSL 的 Kafka Broker 时,它会出现如下所示的“OutofMemory”错误,即使我多次增加堆大小也无济于事。
登录下方 :
java.lang.OutOfMemoryError: Java heap space\
at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61) ~[na:na]\
at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348) ~[na:na]\
at org.apache.kafka.common.memory.MemoryPool.tryAllocate(MemoryPool.java:30) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.Selector.poll(Selector.java:481) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:563) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[kafka-clients-2.7.1.jar!/:na]\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1414) ~[spring-kafka-2.7.7.jar!/:2.7.7]\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1251) ~[spring-kafka-2.7.7.jar!/:2.7.7]\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1163) ~[spring-kafka-2.7.7.jar!/:2.7.7]\
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]\
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]\
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]\
我当前的 YAML 配置如下:
spring:
kafka:
bootstrap-servers: KAFKA_BOOTSTRAP_SERVER
security:
protocol: "SSL"
consumer:
auto-offset-reset: earliest
producer:
topic: TOPIC
bootstrap-servers: KAFKA_BOOTSTRAP_SERVER
consumer:
topic: TOPIC
bootstrap-servers: KAFKA_BOOTSTRAP_SERVERS
它在连接到非 SSL Kafka Broker 时按预期工作。
我已经测试了所有其他可能性,并挑出它与客户端的 SSL 配置有关。
尝试以非安全方式Kafka 安全端点 时,可能会运行 出现内存不足错误。 (当使用错误的安全协议或未传递所需的身份验证属性时,这是一个已知问题;OOM 错误完全无关,但确实如此)
对于 Kafka CLI 命令,通常会随命令传递 属性 文件路径以提供与安全相关的属性。
例如:
kafka-topics --command-config <String: filename>
kafka-console-producer --producer.config <String: filename>
kafka-console-consumer --consumer.config <String: filename>
一般包含,
security.protocol=<kafka_security_protocol>
ssl.truststore.location=<ssl_truststore_filename>
ssl.truststore.password=<truststore_password>
ssl.keystore.location=<client_keystore.jks>
ssl.keystore.password=<password>
ssl.key.password=<password>
根据这个问题,我假设生产者和消费者组件都连接到相同的代理,并在 spring.kafka[= 下声明了连接到安全代理所需的所有属性42=] 以下示例中的部分。
spring:
kafka:
bootstrap-servers: KAFKA_BOOTSTRAP_SERVER
security:
protocol: "SSL"
ssl:
trust-store-location: "truststore.jks"
trust-store-password: "<password>"
key-store-location: "keystore.jks"
key-store-password: "<password>"
key-password: "<password>"
如果生产者和消费者连接到不同的代理,则应在 spring.kafka.producer 和 [= 下指定这些属性45=].consumer 部分分别.
spring:
kafka:
bootstrap-servers: KAFKA_BOOTSTRAP_SERVER
security:
protocol: "SSL"
producer:
topic: TOPIC
bootstrap-servers: KAFKA_BOOTSTRAP_SERVER
ssl.protocol: "SSL"
ssl.endpoint.identification.algorithm: "https"
ssl:
keystore-location: "<keystore.jks>"
keystore-password: "<password>"
consumer:
topic: TOPIC
auto-offset-reset: "earliest"
bootstrap-servers: KAFKA_BOOTSTRAP_SERVERS
ssl.protocol: "SSL"
ssl.endpoint.identification.algorithm: "https"
ssl:
keystore-location: "<keystore.jks>"
keystore-password: "<password>"
如果代理端不需要客户端身份验证,则以下是最小配置示例:
security.protocol=SSL
ssl.truststore.location=<kafka.client.truststore.jks>
ssl.truststore.password=<password>
如果需要客户端身份验证,还需要包含以下属性。
ssl.keystore.location=<kafka.client.keystore.jks>
ssl.keystore.password=<password>
ssl.key.password=<password>
请注意,属性 命名约定在 Spring Kafka 配置中可能有所不同。
有关 Kafka 安全性的更多详细信息 - Official Doc