Spring Cloud Stream Kafka-Streams 无法为我的消费者和生产者配置 SSL
Spring Cloud Stream Kafka-Streams unable to configure SSL for my consumer and producer
我正在努力为 Kafka-Streams 正确配置 Spring Cloud Stream 以将 SSL 与信任库和密钥库一起使用。
在我的应用程序中,我有多个流 运行,所有流的 SSL 配置应该相同。
应用程序如下所示:
流 1:
主题 1 > 主题 2
流 2:
主题2
> 主题4
话题3
流 3:
主题 4 > 主题 5
我将最新的 Spring-Cloud Stream Framework 与 Kafka-Streams 和 Avro 模型一起使用。我可以配置架构注册表。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
我的 application.yaml 文件如下所示:
spring.application.name: processingapp
spring.cloud:
function.definition: stream1;stream2;stream3
stream:
bindings:
stream1-in-0:
destination: topic1
stream1-out-0:
destination: topic2
stream2-in-0:
destination: topic2
stream2-in-1:
destination: topic3
stream2-out-0:
destination: topic4
stream3-in-0:
destination: topic4
stream3-out-0:
destination: topic5
kafka:
binder:
brokers: kafkabrokerurl.com:9092
configuration: # not recognized at all
security.protocol: SSL
ssl.truststore.location: /mnt/truststore.jks
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/keystore.jks
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2
bindings:
default:
consumer:
resetOffsets: false
startOffset: latest
stream1-in-0:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream1-out-0:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream2-in-0:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream2-in-1:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
materializedAs: sbinfo-outage-mapping-store
stream2-out-0:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream3-in-0:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream3-out-0:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
streams:
binder:
configuration:
schema.registry.url: https://schemaregistryurl.com # this works
当我在启用调试日志的情况下启动应用程序时,它显示它不会加载我设置的配置,除了架构注册表。
o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [kafkabrokerurl.com:9092]
client.dns.lookup = use_all_dns_ips
client.id =
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
因此代理将被正确加载,但例如 truststore.location 只是保持为空。
我在这里和其他一些地方尝试了很多不同的方法。
我在这里发现了一个老问题并尝试了这种方法,但结果是一样的:
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/129
configuration: # not recognized at all
"[security.protocol]": SSL
"[ssl.truststore.location]": /mnt/truststore.jks
"[ssl.truststore.type]": JKS
"[ssl.keystore.location]": /mnt/keystore.jks
"[ssl.keystore.type]": JKS
"[ssl.enabled.protocols]": TLSv1.2
我了解到当使用多个绑定器时配置不起作用,所以我也尝试了定义绑定器名称的方法,但它抱怨它无法识别它。
spring.application.name: processingapp
spring.cloud:
function.definition: stream1;stream2;stream3
stream:
bindings:
stream1-in-0:
destination: topic1
binder: ssl
stream1-out-0:
destination: topic2
binder: ssl
stream2-in-0:
destination: topic2
binder: ssl
stream2-in-1:
destination: topic3
binder: ssl
stream2-out-0:
destination: topic4
binder: ssl
stream3-in-0:
destination: topic4
binder: ssl
stream3-out-0:
destination: topic5
binder: ssl
binders:
ssl:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers:
configuration:
security.protocol: SSL
ssl.truststore.location: /mnt/secrets/truststore.jks
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/secrets/keystore.jks
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2
错误:
2021-07-15 17:11:14.634 ERROR 5216 --- [ main] o.s.boot.SpringApplication : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: Unknown binder configuration: kstream
我有一个带注释的@Configuration class,其中我的 3 个流声明为
函数、BiFunction 和函数。
我希望有人能帮助我 - 谢谢。
您缺少 属性 名称中的 streams
元素 - 您正在配置 Kafka MessageChannel Binder。
spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
security:
protocol: SSL
我正在努力为 Kafka-Streams 正确配置 Spring Cloud Stream 以将 SSL 与信任库和密钥库一起使用。
在我的应用程序中,我有多个流 运行,所有流的 SSL 配置应该相同。
应用程序如下所示:
流 1: 主题 1 > 主题 2
流 2: 主题2 > 主题4 话题3
流 3: 主题 4 > 主题 5
我将最新的 Spring-Cloud Stream Framework 与 Kafka-Streams 和 Avro 模型一起使用。我可以配置架构注册表。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
我的 application.yaml 文件如下所示:
spring.application.name: processingapp
spring.cloud:
function.definition: stream1;stream2;stream3
stream:
bindings:
stream1-in-0:
destination: topic1
stream1-out-0:
destination: topic2
stream2-in-0:
destination: topic2
stream2-in-1:
destination: topic3
stream2-out-0:
destination: topic4
stream3-in-0:
destination: topic4
stream3-out-0:
destination: topic5
kafka:
binder:
brokers: kafkabrokerurl.com:9092
configuration: # not recognized at all
security.protocol: SSL
ssl.truststore.location: /mnt/truststore.jks
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/keystore.jks
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2
bindings:
default:
consumer:
resetOffsets: false
startOffset: latest
stream1-in-0:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream1-out-0:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream2-in-0:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream2-in-1:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
materializedAs: sbinfo-outage-mapping-store
stream2-out-0:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream3-in-0:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream3-out-0:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
streams:
binder:
configuration:
schema.registry.url: https://schemaregistryurl.com # this works
当我在启用调试日志的情况下启动应用程序时,它显示它不会加载我设置的配置,除了架构注册表。
o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [kafkabrokerurl.com:9092]
client.dns.lookup = use_all_dns_ips
client.id =
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
因此代理将被正确加载,但例如 truststore.location 只是保持为空。
我在这里和其他一些地方尝试了很多不同的方法。
我在这里发现了一个老问题并尝试了这种方法,但结果是一样的: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/129
configuration: # not recognized at all
"[security.protocol]": SSL
"[ssl.truststore.location]": /mnt/truststore.jks
"[ssl.truststore.type]": JKS
"[ssl.keystore.location]": /mnt/keystore.jks
"[ssl.keystore.type]": JKS
"[ssl.enabled.protocols]": TLSv1.2
我了解到当使用多个绑定器时配置不起作用,所以我也尝试了定义绑定器名称的方法,但它抱怨它无法识别它。
spring.application.name: processingapp
spring.cloud:
function.definition: stream1;stream2;stream3
stream:
bindings:
stream1-in-0:
destination: topic1
binder: ssl
stream1-out-0:
destination: topic2
binder: ssl
stream2-in-0:
destination: topic2
binder: ssl
stream2-in-1:
destination: topic3
binder: ssl
stream2-out-0:
destination: topic4
binder: ssl
stream3-in-0:
destination: topic4
binder: ssl
stream3-out-0:
destination: topic5
binder: ssl
binders:
ssl:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers:
configuration:
security.protocol: SSL
ssl.truststore.location: /mnt/secrets/truststore.jks
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/secrets/keystore.jks
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2
错误:
2021-07-15 17:11:14.634 ERROR 5216 --- [ main] o.s.boot.SpringApplication : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: Unknown binder configuration: kstream
我有一个带注释的@Configuration class,其中我的 3 个流声明为 函数、BiFunction 和函数。
我希望有人能帮助我 - 谢谢。
您缺少 属性 名称中的 streams
元素 - 您正在配置 Kafka MessageChannel Binder。
spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
security:
protocol: SSL