如何使用 Spring Cloud Stream 禁用 KPL/KCL 的 CloudWatch 指标

How to disable CloudWatch metrics for KPL/KCL with Spring Cloud Stream

我正在使用 Spring Cloud Stream Binder for Kinesis 并启用了 KPL/KCL。我们希望禁用 Cloudwatch 指标,而不必自己管理 KPL 和 KCL 的配置(完全覆盖 bean)。除了 KinesisProducerConfiguration.setMetricsLevel()KinesisClientLibConfiguration.withMetricsLevel(...) 属性之外,我们希望对 KinesisProducerConfiguration 和每个 KinesisClientLibConfiguration 使用相同的 bean 定义。

供参考,这里是 Spring Cloud Stream Kinesis Binder 中定义 AWS bean 的位置:KinesisBinderConfiguration.java

最有效的方法是什么?

感谢任何帮助!谢谢。

框架不提供任何KinesisClientLibConfiguration。您的项目有责任公开这样的 bean 以及您需要的任何选项:https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc#kinesis-consumer-properties

Starting with version 2.0.1, beans of KinesisClientLibConfiguration type can be provided in the application context to have a full control over Kinesis Client Library configuration options.

生产方确实被 KinesisBinderConfiguration 中的 KinesisProducerConfiguration bean 覆盖:

@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "spring.cloud.stream.kinesis.binder.kpl-kcl-enabled")
public KinesisProducerConfiguration kinesisProducerConfiguration() {
    KinesisProducerConfiguration kinesisProducerConfiguration = new KinesisProducerConfiguration();
    kinesisProducerConfiguration.setCredentialsProvider(this.awsCredentialsProvider);
    kinesisProducerConfiguration.setRegion(this.region);
    return kinesisProducerConfiguration;
}

从这里我看不出有什么大问题可以在您自己的配置中声明这样一个 bean 以及您想要拥有的任何其他属性,包括提到的指标。

如果这仍然不适合您,您可以将这个 bean 注入到您自己的 bean 中,然后按照您想要的方式对其进行变异:

@Bean
String configurerBean(KinesisProducerConfiguration kinesisProducerConfiguration)  {
   kinesisProducerConfiguration.setMetricsLevel();
   return null;
}

更新

消费者部分:

这是一个基于我们内部创建的 KCL 默认配置实例的 bean:

@Bean
KinesisClientLibConfiguration kinesisClientLibConfiguration() {
    return new KinesisClientLibConfiguration(this.consumerGroup,
                            this.stream,
                            null,
                            null,
                            this.streamInitialSequence,
                            this.kinesisProxyCredentialsProvider,
                            null,
                            null,
                            KinesisClientLibConfiguration.DEFAULT_FAILOVER_TIME_MILLIS,
                            this.workerId,
                            KinesisClientLibConfiguration.DEFAULT_MAX_RECORDS,
                            this.idleBetweenPolls,
                            false,
                            KinesisClientLibConfiguration.DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
                            KinesisClientLibConfiguration.DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
                            KinesisClientLibConfiguration.DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
                            new ClientConfiguration(),
                            new ClientConfiguration(),
                            new ClientConfiguration(),
                            this.consumerBackoff,
                            KinesisClientLibConfiguration.DEFAULT_METRICS_BUFFER_TIME_MILLIS,
                            KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE,
                            KinesisClientLibConfiguration.DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
                            null,
                            KinesisClientLibConfiguration.DEFAULT_SHUTDOWN_GRACE_MILLIS,
                            KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE,
                            new SimpleRecordsFetcherFactory(),
                            DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS,
                            DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS,
                            DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS);
}

无论你在 this. 中看到什么,都必须用你的环境中的相应值替换。在这种情况下,可能 KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE 就是您要查找的内容。

this.consumerGroupthis.stream 必须与您要为其配置消费者的绑定相同。