Spring 使用 SSL 引导应用程序连接到 Kafka

Spring Boot App connection to Kafka with SSL

我有简单的 Spring Boot 应用程序和 Kafka,具有有效的 SSL 连接(其他应用程序,不是 Spring Boot,连接成功)。 我无法访问 kafka 经纪人的财产。 我的应用程序是 kafka 的客户端。这个应用程序 运行 在 kubernetes 的容器中。我的 spring 引导程序可以访问 keystore.p12、ca-cert、kafka.pem、kafka.key 文件(在容器内的目录中)。

在我使用的配置中

spring.kafka.security.protocol=SSL
spring.kafka.ssl.protocol=SSL
spring.kafka.ssl.key-store-type=PKCS12
spring.kafka.ssl.key-store-location=file:///path/to/keystore.p12
spring.kafka.ssl.key-store-password=password
spring.kafka.ssl.trust-store-type=PKCS12
spring.kafka.ssl.trust-store-location=file:///path/to/keystore.p12 (it's the same file, and I think it's incorrect)
spring.kafka.ssl.trust-store-password=password

spring.kafka.properties.ssl.endpoint.identification.algorithm=
spring.kafka.enable.ssl.certificate.verification=false

每次我收到错误

org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at sun.security.ssl.Alert.createSSLException(Alert.java:131) ~[?:?]
    at sun.security.ssl.TransportContext.fatal(TransportContext.java:349) ~[?:?]
    at sun.security.ssl.TransportContext.fatal(TransportContext.java:292) ~[?:?]
    at sun.security.ssl.TransportContext.fatal(TransportContext.java:287) ~[?:?]
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:654) ~[?:?]
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:473) ~[?:?]
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:369) ~[?:?]
    at sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392) ~[?:?]
    at sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061) ~[?:?]
    at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008) ~[?:?]
    at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:430) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:514) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:368) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:291) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:481) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:551) [kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1389) [kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1320) [kafka-clients-3.0.0.jar!/:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439) ~[?:?]
    at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306) ~[?:?]
    at sun.security.validator.Validator.validate(Validator.java:264) ~[?:?]
    at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:313) ~[?:?]
    at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:276) ~[?:?]
    at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141) ~[?:?]
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:632) ~[?:?]
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:473) ~[?:?]
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:369) ~[?:?]
    at sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392) ~[?:?]
    at sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061) ~[?:?]
    at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008) ~[?:?]
    at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:430) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:514) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:368) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:291) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:481) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:551) [kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1389) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1320) ~[kafka-clients-3.0.0.jar!/:?]
    at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141) ~[?:?]
    at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126) ~[?:?]
    at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297) ~[?:?]
    at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434) ~[?:?]
    at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306) ~[?:?]
    at sun.security.validator.Validator.validate(Validator.java:264) ~[?:?]
    at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:313) ~[?:?]
    at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:276) ~[?:?]
    at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141) ~[?:?]
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:632) ~[?:?]
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:473) ~[?:?]
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:369) ~[?:?]
    at sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392) ~[?:?]
    at sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061) ~[?:?]
    at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008) ~[?:?]
    at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:430) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:514) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:368) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:291) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:481) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:551) [kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1389) ~[kafka-clients-3.0.0.jar!/:?]
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1320) ~[kafka-clients-3.0.0.jar!/:?]
    at java.lang.Thread.run(Thread.java:829) ~[?:?]

我尝试不同的变体:仅密钥库、仅信任库、删除配置中的最后两个属性(endpoint.identification.algorithm 和 certificate.verification)。我应该尝试创建信任库并导入容器中的证书吗?我不明白 this.What 的正确配置是正确的配置还是使用我拥有的证书的正确方法?

问题出在错误的属性语法上。 正确的做法

spring.kafka.properties.ssl.keystore.type=PKCS12
spring.kafka.properties.ssl.keystore.location=/path/to/keystore.p12
spring.kafka.properties.ssl.keystore.password=password
spring.kafka.properties.ssl.truststore.type=PKCS12
spring.kafka.properties.ssl.truststore.location=/path/to/keystore.p12 (it's the same file, it's correct!!)
spring.kafka.properties.ssl.truststore.password=password 

是的,在密钥库和信任库中使用相同的 p12 文件是绝对可以接受的。