添加检查点时不消耗记录
Records are not consumed when adding checkpointer
我对 KinesisMessageDrivenChannelAdapter 进行了以下配置,当我删除 dynamoDbMetaDataStore
作为检查点时,可以正确接收消息,但是当我将其添加回来时,记录始终为空。
我调试了代码和 KinesisMessageDrivenChannelAdapter.processTask()
行 776(版本 2.0.0.M2)returns 空记录。
更新:
public DynamoDbMetaDataStore dynamoDbMetaDataStore() {
String url = consumerClientProperties.getDynamoDB().getUrl();
final AmazonDynamoDBAsync amazonDynamoDB = AmazonDynamoDBAsyncClientBuilder.standard()
.withEndpointConfiguration(new EndpointConfiguration(
url,
Regions.fromName(awsRegion).getName()))
.withClientConfiguration(new ClientConfiguration()
.withMaxErrorRetry(consumerClientProperties.getDynamoDB().getRetries())
.withConnectionTimeout(consumerClientProperties.getDynamoDB().getConnectionTimeout())).build();
DynamoDbMetaDataStore dynamoDbMetaDataStore = new DynamoDbMetaDataStore(amazonDynamoDB, "consumer-test");
return dynamoDbMetaDataStore;
}
public KinesisMessageDrivenChannelAdapter kinesisInboundChannel(
AmazonKinesis amazonKinesis, String[] streamNames) {
KinesisMessageDrivenChannelAdapter adapter =
new KinesisMessageDrivenChannelAdapter(amazonKinesis, streamNames);
adapter.setConverter(null);
adapter.setOutputChannel(kinesisReceiveChannel());
adapter.setCheckpointStore(dynamoDbMetaDataStore());
adapter.setConsumerGroup(consumerClientProperties.getName());
adapter.setCheckpointMode(CheckpointMode.manual);
adapter.setListenerMode(ListenerMode.record);
adapter.setStartTimeout(10000);
adapter.setDescribeStreamRetries(1);
adapter.setConcurrency(10);
return adapter;
}
谢谢
我建议您使用最新的 2.0.0.BUILD-SNAPSHOT
.
测试您的解决方案
已经有这样的选项:
/**
* Specify a {@link LockRegistry} for an exclusive access to provided streams.
* This is not used when shards-based configuration is provided.
* @param lockRegistry the {@link LockRegistry} to use.
* @since 2.0
*/
public void setLockRegistry(LockRegistry lockRegistry) {
您需要在其中注入 DynamoDbLockRegistry
以实现更好的检查点管理。
为此,您还需要添加此依赖项:
compile("com.amazonaws:dynamodb-lock-client:1.0.0")
过滤确实可能存在一些问题 M2
但是...
我对 KinesisMessageDrivenChannelAdapter 进行了以下配置,当我删除 dynamoDbMetaDataStore
作为检查点时,可以正确接收消息,但是当我将其添加回来时,记录始终为空。
我调试了代码和 KinesisMessageDrivenChannelAdapter.processTask()
行 776(版本 2.0.0.M2)returns 空记录。
更新:
public DynamoDbMetaDataStore dynamoDbMetaDataStore() {
String url = consumerClientProperties.getDynamoDB().getUrl();
final AmazonDynamoDBAsync amazonDynamoDB = AmazonDynamoDBAsyncClientBuilder.standard()
.withEndpointConfiguration(new EndpointConfiguration(
url,
Regions.fromName(awsRegion).getName()))
.withClientConfiguration(new ClientConfiguration()
.withMaxErrorRetry(consumerClientProperties.getDynamoDB().getRetries())
.withConnectionTimeout(consumerClientProperties.getDynamoDB().getConnectionTimeout())).build();
DynamoDbMetaDataStore dynamoDbMetaDataStore = new DynamoDbMetaDataStore(amazonDynamoDB, "consumer-test");
return dynamoDbMetaDataStore;
}
public KinesisMessageDrivenChannelAdapter kinesisInboundChannel(
AmazonKinesis amazonKinesis, String[] streamNames) {
KinesisMessageDrivenChannelAdapter adapter =
new KinesisMessageDrivenChannelAdapter(amazonKinesis, streamNames);
adapter.setConverter(null);
adapter.setOutputChannel(kinesisReceiveChannel());
adapter.setCheckpointStore(dynamoDbMetaDataStore());
adapter.setConsumerGroup(consumerClientProperties.getName());
adapter.setCheckpointMode(CheckpointMode.manual);
adapter.setListenerMode(ListenerMode.record);
adapter.setStartTimeout(10000);
adapter.setDescribeStreamRetries(1);
adapter.setConcurrency(10);
return adapter;
}
谢谢
我建议您使用最新的 2.0.0.BUILD-SNAPSHOT
.
已经有这样的选项:
/**
* Specify a {@link LockRegistry} for an exclusive access to provided streams.
* This is not used when shards-based configuration is provided.
* @param lockRegistry the {@link LockRegistry} to use.
* @since 2.0
*/
public void setLockRegistry(LockRegistry lockRegistry) {
您需要在其中注入 DynamoDbLockRegistry
以实现更好的检查点管理。
为此,您还需要添加此依赖项:
compile("com.amazonaws:dynamodb-lock-client:1.0.0")
过滤确实可能存在一些问题 M2
但是...