Spring Cloud Stream Kafka-Streams 无法为我的消费者和生产者配置 SSL

Spring Cloud Stream Kafka-Streams unable to configure SSL for my consumer and producer

我正在努力为 Kafka-Streams 正确配置 Spring Cloud Stream 以将 SSL 与信任库和密钥库一起使用。

在我的应用程序中,我有多个流 运行,所有流的 SSL 配置应该相同。

应用程序如下所示:

流 1: 主题 1 > 主题 2

流 2: 主题2 > 主题4 话题3

流 3: 主题 4 > 主题 5

我将最新的 Spring-Cloud Stream Framework 与 Kafka-Streams 和 Avro 模型一起使用。我可以配置架构注册表。

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

我的 application.yaml 文件如下所示:

spring.application.name: processingapp
spring.cloud:
  function.definition: stream1;stream2;stream3
    stream:
      bindings:
        stream1-in-0:
          destination: topic1
        stream1-out-0:
          destination: topic2
        stream2-in-0:
          destination: topic2
        stream2-in-1:
          destination: topic3
        stream2-out-0:
          destination: topic4
        stream3-in-0:
          destination: topic4
        stream3-out-0:
          destination: topic5
      kafka:
        binder:
          brokers: kafkabrokerurl.com:9092
          configuration: # not recognized at all
            security.protocol: SSL
            ssl.truststore.location: /mnt/truststore.jks
            ssl.truststore.type: JKS
            ssl.keystore.location: /mnt/keystore.jks
            ssl.keystore.type: JKS
            ssl.enabled.protocols: TLSv1.2
        bindings:
          default:
            consumer:
              resetOffsets: false
              startOffset: latest
        stream1-in-0:
          consumer:
            keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
            valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream1-out-0:
            producer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream2-in-0:
            consumer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream2-in-1:
            consumer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
              materializedAs: sbinfo-outage-mapping-store
          stream2-out-0:
            producer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream3-in-0:
            consumer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream3-out-0:
            producer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
        streams:
          binder:
            configuration:
              schema.registry.url: https://schemaregistryurl.com # this works

当我在启用调试日志的情况下启动应用程序时,它显示它不会加载我设置的配置,除了架构注册表。

 o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
    bootstrap.servers = [kafkabrokerurl.com:9092]
    client.dns.lookup = use_all_dns_ips
    client.id = 
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 127000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

因此代理将被正确加载,但例如 truststore.location 只是保持为空。

我在这里和其他一些地方尝试了很多不同的方法。

我在这里发现了一个老问题并尝试了这种方法,但结果是一样的: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/129

          configuration: # not recognized at all
            "[security.protocol]": SSL
            "[ssl.truststore.location]": /mnt/truststore.jks
            "[ssl.truststore.type]": JKS
            "[ssl.keystore.location]": /mnt/keystore.jks
            "[ssl.keystore.type]": JKS
            "[ssl.enabled.protocols]": TLSv1.2

我了解到当使用多个绑定器时配置不起作用,所以我也尝试了定义绑定器名称的方法,但它抱怨它无法识别它。

spring.application.name: processingapp
spring.cloud:
  function.definition: stream1;stream2;stream3
    stream:
      bindings:
        stream1-in-0:
          destination: topic1
          binder: ssl
        stream1-out-0:
          destination: topic2
          binder: ssl
        stream2-in-0:
          destination: topic2
          binder: ssl
        stream2-in-1:
          destination: topic3
          binder: ssl
        stream2-out-0:
          destination: topic4
          binder: ssl
        stream3-in-0:
          destination: topic4
          binder: ssl
        stream3-out-0:
          destination: topic5
          binder: ssl
      binders:
        ssl:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers:
                      configuration:
                        security.protocol: SSL
                        ssl.truststore.location: /mnt/secrets/truststore.jks
                        ssl.truststore.type: JKS
                        ssl.keystore.location: /mnt/secrets/keystore.jks
                        ssl.keystore.type: JKS
                        ssl.enabled.protocols: TLSv1.2

错误:

2021-07-15 17:11:14.634 ERROR 5216 --- [           main] o.s.boot.SpringApplication               : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: Unknown binder configuration: kstream

我有一个带注释的@Configuration class,其中我的 3 个流声明为 函数、BiFunction 和函数。

我希望有人能帮助我 - 谢谢。

您缺少 属性 名称中的 streams 元素 - 您正在配置 Kafka MessageChannel Binder。

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            configuration:
              security:
                protocol: SSL