Spring Cloud Stream 中 Kinesis Stream、DynamoDB 和 CloudWatch 的不同凭据

Different credentials for Kinesis Stream, DynamoDB and CloudWatch inside Spring Cloud Stream

我正在使用 Spring Cloud Stream Kinesis 活页夹(版本 2.1.0)

出于安全原因,我必须拥有一组用于 Kinesis 的凭证和另一组用于 DynamoDB 和 CloudWatch 的凭证。

如果 spring.cloud.stream.kinesis.binder.kplKclEnabled 设置为 false,一切正常。但是如果设置为 true 我有异常

com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream {my_stream} under account {my_account} not found

整个堆栈跟踪在 https://pastebin.com/bjvKSzrg

可用

我想启用 KCL,有人知道如何避免这个错误吗?

我知道发生该错误是因为 cloudwatch 和 dynamodb 的用户凭据没有“看到”提到的 Kinesis 流。但是为什么他们需要看到它呢?此外,如果禁用 KCL,它将按预期工作。所以不明白为什么它不能与启用的 KCL

一起使用

这是我的属性文件

spring.main.allow-bean-definition-overriding=true
spring.cloud.stream.bindings.input.destination=streamName
spring.cloud.stream.bindings.input.group=worker
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.kinesis.bindings.input.consumer.listener-mode=batch
spring.cloud.stream.bindings.input.binder=kinesisConsumer



spring.cloud.stream.binders.kinesisConsumer.type=kinesis
spring.cloud.stream.binders.kinesisConsumer.defaultCandidate=false
spring.cloud.stream.binders.kinesisConsumer.environment.spring.main.sources=com.philips.ka.oneka.kinesis.config.KinesisOutputConfiguration

cloud.aws.stack.auto=false
cloud.aws.credentials.useDefaultAwsCredentialsChain=false
cloud.aws.credentials.instanceProfile=true

spring.cloud.stream.kinesis.binder.kplKclEnabled=true

提到的配置class

@Configuration
@EnableConfigurationProperties(AwsProperties.class)
public class KinesisOutputConfiguration {
    AwsProperties.Properties properties;

    public KinesisOutputConfiguration(AwsProperties awsProperties) {
        this.properties = awsProperties.getStreamType().get(AwsProperties.StreamType.SPECTRE);
    }

    @Bean(destroyMethod = "shutdown")
    public AmazonKinesisAsync amazonKinesis() {
        RefreshingCredentials refreshingCredentials = new RefreshingCredentials(this.properties.getRefreshed.getUrl(), this.properties.getHsdp().getClientId(),
                this.properties.getRefreshed().getClientSecret(), this.properties.getRefreshed().getUsername(), this.properties.getRefreshed().getPassword(),
                this.properties.getRefreshed().getDiscoveryUrl(), new UriTemplate("{databroker_url}/Stream/$getaccessdetails"),
                new RestTemplate());
        return AmazonKinesisAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("eu-west-1").build();
    }


    @Bean(destroyMethod = "shutdown")
    public AmazonCloudWatchAsync cloudWatch() {
        AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(this.properties.getAccessKey(),
                this.properties.getSecretKey()));
        return AmazonCloudWatchAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("us-east-2").build();
    }

    @Bean(destroyMethod = "shutdown")
    @Primary
    public AmazonDynamoDBAsync dynamoDBAsync() {
        AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(this.properties.getAccessKey(),
                this.properties.getSecretKey()));
        return AmazonDynamoDBAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("us-east-2").build();
    }

}

您的配置是正确的:如果您需要为这些服务使用不同的凭据,您肯定需要为它们声明自定义 bean。 DynamoDB 和 CloudWatch 是 Kinesis Client Library 所需的服务。它一方面用于管理流分片的偏移量,另一方面用于处理集群中的消费者实例更改以实现分片独占访问。因此,事实上 Kinesis 资源必须可供 DynamoDB 和 CloudWatch 用户使用。

在 Kinesis Client Library 中查看更多信息或询问 AWS 支持:Kinesis Binder 在这方面无能为力...

https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html