Spring 云流生产者重试和错误处理

Spring cloud stream producer retries and error handling

我已经设置了一个 spring 云流 kafka 生产者和消费者,并且有 3 个 kafka 经纪人 运行 。我已将 min.insync.replicas 设置为 4 以查看生产者错误处理的工作原理。 messagechannel.send 立即调用 returns 并且生产者日志一直说 NOT_ENOUGH_REPLICAS 这很好并且符合预期。

server.port: 9050
spring:
  cloud:
    stream:
      bindings:
        errorChannel:
          destination: error-topic
        output:
          destination: stream-topic
          group: top-group
          producer:
            errorChannelEnabled: true
      kafka:
        bindings:
          output:
            producer:
              retries: 3
              sync: false
        binder:
          autoCreateTopics: true
          configuration:
            value:
              serializer: com.example.kafkapublisher.MySerializer
          producer-properties:
            acks: all
spring.cloud.stream.kafka.bindings.errorChannel.consumer.enableDlq: true
  

以上是我的生产者配置。虽然retries设置为3,但是producer一直在重试很多次。虽然 sync 设置为 true,但发送调用会立即发出。虽然定义了错误通道和目标,并且 errorChannelEnabled 设置为 true,但我没有在错误主题 my-error 中看到失败消息,也没有创建错误主题。请求你的帮助

任意 Kafka 生产者属性进入 ...producer.configuration 属性。

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#kafka-producer-properties

configuration

Map with a key/value pair containing generic Kafka producer properties. The bootstrap.servers property cannot be set here; use multi-binder support if you need to connect to multiple clusters.

Default: Empty map.