Spring 集成 Kinesis 适配器和消费者组
Spring Integration Kinesis adapter and consumer groups
我有一个使用 spring-integration-aws
版本 1.1.0.RELEASE
开发的运动消费者应用程序。
在我的测试中,我 运行 这个应用程序的两个实例在同一个消费者组中,并从具有两个分片的流中消费。在我的测试中,我意识到 KinesisMessageDrivenChannelAdapter
会以三种方式分发消息:
- 所有消息都发送给一个消费者
- 分发给两个消费者的消息(不均匀)
- 两个消费者都收到了相同的消息
从生产者端来看,消息在两个分片之间均匀分布。我想知道 kinesis 适配器如何在消费者之间分发消息,如果支持,我如何在消费者之间均匀分发消息。
谢谢
更新(适配器配置)
@Bean
public KinesisMessageDrivenChannelAdapter kinesisInboundChannelAdapter(
AmazonKinesis amazonKinesis) {
String[] streamNames = this.consumerClientProperties.getKinesis().getStreamNames();
KinesisMessageDrivenChannelAdapter adapter =
new KinesisMessageDrivenChannelAdapter(amazonKinesis, streamNames);
adapter.setConverter(null);
adapter.setOutputChannel(new QueueChannel());
adapter.setCheckpointStore(dynamoDbMetaDataStore());
adapter.setCheckpointMode(CheckpointMode.record);
adapter.setStartTimeout(10000);
adapter.setConsumerGroup(consumerClientProperties.getName());
adapter.setListenerMode(ListenerMode.record);
adapter.setDescribeStreamRetries(1);
return adapter;
}
@Bean
public DynamoDbMetadataStore dynamoDbMetaDataStore() {
DynamoDbMetadataStore dynamoDbMetaDataStore = new DynamoDbMetadataStore(amazonDynamoDB(),
consumerClientProperties.getName());
return dynamoDbMetaDataStore;
}
建议大家升级到最新的Spring Integration AWS 2.0
:https://spring.io/blog/2018/08/21/spring-integration-for-aws-2-0-ga-and-spring-cloud-stream-kinesis-binder-1-0-ga
我们在 Kinesis 消费者级别完成了大量修复,现在我们有领导选举不要订阅同一个分片超过一次。
我们的想法是在处理记录时严格排序,因此每个集群只有一个线程可以访问一个分片。不过,该线程可能会处理多个分片。
无论如何,如果您使用应用程序的两个实例,您需要注入一个 MetadataStore
共享数据,例如DynamoDbMetadataStore
.
我有一个使用 spring-integration-aws
版本 1.1.0.RELEASE
开发的运动消费者应用程序。
在我的测试中,我 运行 这个应用程序的两个实例在同一个消费者组中,并从具有两个分片的流中消费。在我的测试中,我意识到 KinesisMessageDrivenChannelAdapter
会以三种方式分发消息:
- 所有消息都发送给一个消费者
- 分发给两个消费者的消息(不均匀)
- 两个消费者都收到了相同的消息
从生产者端来看,消息在两个分片之间均匀分布。我想知道 kinesis 适配器如何在消费者之间分发消息,如果支持,我如何在消费者之间均匀分发消息。
谢谢
更新(适配器配置)
@Bean
public KinesisMessageDrivenChannelAdapter kinesisInboundChannelAdapter(
AmazonKinesis amazonKinesis) {
String[] streamNames = this.consumerClientProperties.getKinesis().getStreamNames();
KinesisMessageDrivenChannelAdapter adapter =
new KinesisMessageDrivenChannelAdapter(amazonKinesis, streamNames);
adapter.setConverter(null);
adapter.setOutputChannel(new QueueChannel());
adapter.setCheckpointStore(dynamoDbMetaDataStore());
adapter.setCheckpointMode(CheckpointMode.record);
adapter.setStartTimeout(10000);
adapter.setConsumerGroup(consumerClientProperties.getName());
adapter.setListenerMode(ListenerMode.record);
adapter.setDescribeStreamRetries(1);
return adapter;
}
@Bean
public DynamoDbMetadataStore dynamoDbMetaDataStore() {
DynamoDbMetadataStore dynamoDbMetaDataStore = new DynamoDbMetadataStore(amazonDynamoDB(),
consumerClientProperties.getName());
return dynamoDbMetaDataStore;
}
建议大家升级到最新的Spring Integration AWS 2.0
:https://spring.io/blog/2018/08/21/spring-integration-for-aws-2-0-ga-and-spring-cloud-stream-kinesis-binder-1-0-ga
我们在 Kinesis 消费者级别完成了大量修复,现在我们有领导选举不要订阅同一个分片超过一次。
我们的想法是在处理记录时严格排序,因此每个集群只有一个线程可以访问一个分片。不过,该线程可能会处理多个分片。
无论如何,如果您使用应用程序的两个实例,您需要注入一个 MetadataStore
共享数据,例如DynamoDbMetadataStore
.