Spring Cloud Stream 中 Kinesis Stream、DynamoDB 和 CloudWatch 的不同凭据
Different credentials for Kinesis Stream, DynamoDB and CloudWatch inside Spring Cloud Stream
我正在使用 Spring Cloud Stream Kinesis 活页夹(版本 2.1.0)
出于安全原因,我必须拥有一组用于 Kinesis 的凭证和另一组用于 DynamoDB 和 CloudWatch 的凭证。
如果 spring.cloud.stream.kinesis.binder.kplKclEnabled
设置为 false,一切正常。但是如果设置为 true 我有异常
com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream {my_stream} under account {my_account} not found
整个堆栈跟踪在 https://pastebin.com/bjvKSzrg
可用
我想启用 KCL,有人知道如何避免这个错误吗?
我知道发生该错误是因为 cloudwatch 和 dynamodb 的用户凭据没有“看到”提到的 Kinesis 流。但是为什么他们需要看到它呢?此外,如果禁用 KCL,它将按预期工作。所以不明白为什么它不能与启用的 KCL
一起使用
这是我的属性文件
spring.main.allow-bean-definition-overriding=true
spring.cloud.stream.bindings.input.destination=streamName
spring.cloud.stream.bindings.input.group=worker
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.kinesis.bindings.input.consumer.listener-mode=batch
spring.cloud.stream.bindings.input.binder=kinesisConsumer
spring.cloud.stream.binders.kinesisConsumer.type=kinesis
spring.cloud.stream.binders.kinesisConsumer.defaultCandidate=false
spring.cloud.stream.binders.kinesisConsumer.environment.spring.main.sources=com.philips.ka.oneka.kinesis.config.KinesisOutputConfiguration
cloud.aws.stack.auto=false
cloud.aws.credentials.useDefaultAwsCredentialsChain=false
cloud.aws.credentials.instanceProfile=true
spring.cloud.stream.kinesis.binder.kplKclEnabled=true
提到的配置class
@Configuration
@EnableConfigurationProperties(AwsProperties.class)
public class KinesisOutputConfiguration {
AwsProperties.Properties properties;
public KinesisOutputConfiguration(AwsProperties awsProperties) {
this.properties = awsProperties.getStreamType().get(AwsProperties.StreamType.SPECTRE);
}
@Bean(destroyMethod = "shutdown")
public AmazonKinesisAsync amazonKinesis() {
RefreshingCredentials refreshingCredentials = new RefreshingCredentials(this.properties.getRefreshed.getUrl(), this.properties.getHsdp().getClientId(),
this.properties.getRefreshed().getClientSecret(), this.properties.getRefreshed().getUsername(), this.properties.getRefreshed().getPassword(),
this.properties.getRefreshed().getDiscoveryUrl(), new UriTemplate("{databroker_url}/Stream/$getaccessdetails"),
new RestTemplate());
return AmazonKinesisAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("eu-west-1").build();
}
@Bean(destroyMethod = "shutdown")
public AmazonCloudWatchAsync cloudWatch() {
AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(this.properties.getAccessKey(),
this.properties.getSecretKey()));
return AmazonCloudWatchAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("us-east-2").build();
}
@Bean(destroyMethod = "shutdown")
@Primary
public AmazonDynamoDBAsync dynamoDBAsync() {
AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(this.properties.getAccessKey(),
this.properties.getSecretKey()));
return AmazonDynamoDBAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("us-east-2").build();
}
}
您的配置是正确的:如果您需要为这些服务使用不同的凭据,您肯定需要为它们声明自定义 bean。 DynamoDB 和 CloudWatch 是 Kinesis Client Library 所需的服务。它一方面用于管理流分片的偏移量,另一方面用于处理集群中的消费者实例更改以实现分片独占访问。因此,事实上 Kinesis 资源必须可供 DynamoDB 和 CloudWatch 用户使用。
在 Kinesis Client Library 中查看更多信息或询问 AWS 支持:Kinesis Binder 在这方面无能为力...
https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html
我正在使用 Spring Cloud Stream Kinesis 活页夹(版本 2.1.0)
出于安全原因,我必须拥有一组用于 Kinesis 的凭证和另一组用于 DynamoDB 和 CloudWatch 的凭证。
如果 spring.cloud.stream.kinesis.binder.kplKclEnabled
设置为 false,一切正常。但是如果设置为 true 我有异常
com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream {my_stream} under account {my_account} not found
整个堆栈跟踪在 https://pastebin.com/bjvKSzrg
可用我想启用 KCL,有人知道如何避免这个错误吗?
我知道发生该错误是因为 cloudwatch 和 dynamodb 的用户凭据没有“看到”提到的 Kinesis 流。但是为什么他们需要看到它呢?此外,如果禁用 KCL,它将按预期工作。所以不明白为什么它不能与启用的 KCL
一起使用这是我的属性文件
spring.main.allow-bean-definition-overriding=true
spring.cloud.stream.bindings.input.destination=streamName
spring.cloud.stream.bindings.input.group=worker
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.kinesis.bindings.input.consumer.listener-mode=batch
spring.cloud.stream.bindings.input.binder=kinesisConsumer
spring.cloud.stream.binders.kinesisConsumer.type=kinesis
spring.cloud.stream.binders.kinesisConsumer.defaultCandidate=false
spring.cloud.stream.binders.kinesisConsumer.environment.spring.main.sources=com.philips.ka.oneka.kinesis.config.KinesisOutputConfiguration
cloud.aws.stack.auto=false
cloud.aws.credentials.useDefaultAwsCredentialsChain=false
cloud.aws.credentials.instanceProfile=true
spring.cloud.stream.kinesis.binder.kplKclEnabled=true
提到的配置class
@Configuration
@EnableConfigurationProperties(AwsProperties.class)
public class KinesisOutputConfiguration {
AwsProperties.Properties properties;
public KinesisOutputConfiguration(AwsProperties awsProperties) {
this.properties = awsProperties.getStreamType().get(AwsProperties.StreamType.SPECTRE);
}
@Bean(destroyMethod = "shutdown")
public AmazonKinesisAsync amazonKinesis() {
RefreshingCredentials refreshingCredentials = new RefreshingCredentials(this.properties.getRefreshed.getUrl(), this.properties.getHsdp().getClientId(),
this.properties.getRefreshed().getClientSecret(), this.properties.getRefreshed().getUsername(), this.properties.getRefreshed().getPassword(),
this.properties.getRefreshed().getDiscoveryUrl(), new UriTemplate("{databroker_url}/Stream/$getaccessdetails"),
new RestTemplate());
return AmazonKinesisAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("eu-west-1").build();
}
@Bean(destroyMethod = "shutdown")
public AmazonCloudWatchAsync cloudWatch() {
AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(this.properties.getAccessKey(),
this.properties.getSecretKey()));
return AmazonCloudWatchAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("us-east-2").build();
}
@Bean(destroyMethod = "shutdown")
@Primary
public AmazonDynamoDBAsync dynamoDBAsync() {
AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(this.properties.getAccessKey(),
this.properties.getSecretKey()));
return AmazonDynamoDBAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("us-east-2").build();
}
}
您的配置是正确的:如果您需要为这些服务使用不同的凭据,您肯定需要为它们声明自定义 bean。 DynamoDB 和 CloudWatch 是 Kinesis Client Library 所需的服务。它一方面用于管理流分片的偏移量,另一方面用于处理集群中的消费者实例更改以实现分片独占访问。因此,事实上 Kinesis 资源必须可供 DynamoDB 和 CloudWatch 用户使用。
在 Kinesis Client Library 中查看更多信息或询问 AWS 支持:Kinesis Binder 在这方面无能为力...
https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html