如何使用 Spring Cloud Stream 禁用 KPL/KCL 的 CloudWatch 指标
How to disable CloudWatch metrics for KPL/KCL with Spring Cloud Stream
我正在使用 Spring Cloud Stream Binder for Kinesis 并启用了 KPL/KCL。我们希望禁用 Cloudwatch 指标,而不必自己管理 KPL 和 KCL 的配置(完全覆盖 bean)。除了 KinesisProducerConfiguration.setMetricsLevel()
和 KinesisClientLibConfiguration.withMetricsLevel(...)
属性之外,我们希望对 KinesisProducerConfiguration
和每个 KinesisClientLibConfiguration
使用相同的 bean 定义。
供参考,这里是 Spring Cloud Stream Kinesis Binder 中定义 AWS bean 的位置:KinesisBinderConfiguration.java
最有效的方法是什么?
感谢任何帮助!谢谢。
框架不提供任何KinesisClientLibConfiguration
。您的项目有责任公开这样的 bean 以及您需要的任何选项:https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc#kinesis-consumer-properties
Starting with version 2.0.1, beans of KinesisClientLibConfiguration
type can be provided in the application context to have a full control over Kinesis Client Library configuration options.
生产方确实被 KinesisBinderConfiguration
中的 KinesisProducerConfiguration
bean 覆盖:
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "spring.cloud.stream.kinesis.binder.kpl-kcl-enabled")
public KinesisProducerConfiguration kinesisProducerConfiguration() {
KinesisProducerConfiguration kinesisProducerConfiguration = new KinesisProducerConfiguration();
kinesisProducerConfiguration.setCredentialsProvider(this.awsCredentialsProvider);
kinesisProducerConfiguration.setRegion(this.region);
return kinesisProducerConfiguration;
}
从这里我看不出有什么大问题可以在您自己的配置中声明这样一个 bean 以及您想要拥有的任何其他属性,包括提到的指标。
如果这仍然不适合您,您可以将这个 bean 注入到您自己的 bean 中,然后按照您想要的方式对其进行变异:
@Bean
String configurerBean(KinesisProducerConfiguration kinesisProducerConfiguration) {
kinesisProducerConfiguration.setMetricsLevel();
return null;
}
更新
消费者部分:
这是一个基于我们内部创建的 KCL 默认配置实例的 bean:
@Bean
KinesisClientLibConfiguration kinesisClientLibConfiguration() {
return new KinesisClientLibConfiguration(this.consumerGroup,
this.stream,
null,
null,
this.streamInitialSequence,
this.kinesisProxyCredentialsProvider,
null,
null,
KinesisClientLibConfiguration.DEFAULT_FAILOVER_TIME_MILLIS,
this.workerId,
KinesisClientLibConfiguration.DEFAULT_MAX_RECORDS,
this.idleBetweenPolls,
false,
KinesisClientLibConfiguration.DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
KinesisClientLibConfiguration.DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
KinesisClientLibConfiguration.DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(),
new ClientConfiguration(),
new ClientConfiguration(),
this.consumerBackoff,
KinesisClientLibConfiguration.DEFAULT_METRICS_BUFFER_TIME_MILLIS,
KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE,
KinesisClientLibConfiguration.DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
null,
KinesisClientLibConfiguration.DEFAULT_SHUTDOWN_GRACE_MILLIS,
KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE,
new SimpleRecordsFetcherFactory(),
DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS,
DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS,
DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS);
}
无论你在 this.
中看到什么,都必须用你的环境中的相应值替换。在这种情况下,可能 KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE
就是您要查找的内容。
this.consumerGroup
和 this.stream
必须与您要为其配置消费者的绑定相同。
我正在使用 Spring Cloud Stream Binder for Kinesis 并启用了 KPL/KCL。我们希望禁用 Cloudwatch 指标,而不必自己管理 KPL 和 KCL 的配置(完全覆盖 bean)。除了 KinesisProducerConfiguration.setMetricsLevel()
和 KinesisClientLibConfiguration.withMetricsLevel(...)
属性之外,我们希望对 KinesisProducerConfiguration
和每个 KinesisClientLibConfiguration
使用相同的 bean 定义。
供参考,这里是 Spring Cloud Stream Kinesis Binder 中定义 AWS bean 的位置:KinesisBinderConfiguration.java
最有效的方法是什么?
感谢任何帮助!谢谢。
框架不提供任何KinesisClientLibConfiguration
。您的项目有责任公开这样的 bean 以及您需要的任何选项:https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc#kinesis-consumer-properties
Starting with version 2.0.1, beans of
KinesisClientLibConfiguration
type can be provided in the application context to have a full control over Kinesis Client Library configuration options.
生产方确实被 KinesisBinderConfiguration
中的 KinesisProducerConfiguration
bean 覆盖:
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "spring.cloud.stream.kinesis.binder.kpl-kcl-enabled")
public KinesisProducerConfiguration kinesisProducerConfiguration() {
KinesisProducerConfiguration kinesisProducerConfiguration = new KinesisProducerConfiguration();
kinesisProducerConfiguration.setCredentialsProvider(this.awsCredentialsProvider);
kinesisProducerConfiguration.setRegion(this.region);
return kinesisProducerConfiguration;
}
从这里我看不出有什么大问题可以在您自己的配置中声明这样一个 bean 以及您想要拥有的任何其他属性,包括提到的指标。
如果这仍然不适合您,您可以将这个 bean 注入到您自己的 bean 中,然后按照您想要的方式对其进行变异:
@Bean
String configurerBean(KinesisProducerConfiguration kinesisProducerConfiguration) {
kinesisProducerConfiguration.setMetricsLevel();
return null;
}
更新
消费者部分:
这是一个基于我们内部创建的 KCL 默认配置实例的 bean:
@Bean
KinesisClientLibConfiguration kinesisClientLibConfiguration() {
return new KinesisClientLibConfiguration(this.consumerGroup,
this.stream,
null,
null,
this.streamInitialSequence,
this.kinesisProxyCredentialsProvider,
null,
null,
KinesisClientLibConfiguration.DEFAULT_FAILOVER_TIME_MILLIS,
this.workerId,
KinesisClientLibConfiguration.DEFAULT_MAX_RECORDS,
this.idleBetweenPolls,
false,
KinesisClientLibConfiguration.DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
KinesisClientLibConfiguration.DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
KinesisClientLibConfiguration.DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(),
new ClientConfiguration(),
new ClientConfiguration(),
this.consumerBackoff,
KinesisClientLibConfiguration.DEFAULT_METRICS_BUFFER_TIME_MILLIS,
KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE,
KinesisClientLibConfiguration.DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
null,
KinesisClientLibConfiguration.DEFAULT_SHUTDOWN_GRACE_MILLIS,
KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE,
new SimpleRecordsFetcherFactory(),
DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS,
DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS,
DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS);
}
无论你在 this.
中看到什么,都必须用你的环境中的相应值替换。在这种情况下,可能 KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE
就是您要查找的内容。
this.consumerGroup
和 this.stream
必须与您要为其配置消费者的绑定相同。