AWS Kinesis KCL 跳过启动前添加的记录
AWS Kinesis KCL skips records added before startup
我开始同时使用 KPL
和 KCL
在服务之间交换数据。但是每当 consumer service
离线时, KPL
发送的所有数据将永远丢失。所以我只得到那些在 consumer service
启动并且它的 shardConsumer
准备就绪时发送的数据块。我需要从最后消耗的点或以其他方式处理留下的数据。
开始
这是我的 ShardProcessor
代码:
@Override
public void initialize(InitializationInput initializationInput) {
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
processRecordsInput.records()
.forEach(record -> {
//my logic
});
}
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
}
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
try {
shardEndedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
LOG.error("Kinesis error on Shard Ended", e);
}
}
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
try {
shutdownRequestedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
LOG.error("Kinesis error on Shutdown Requested", e);
}
}
和配置代码:
public void configure(String streamName, ShardRecordProcessorFactory factory) {
Region region = Region.of(awsRegion);
KinesisAsyncClient kinesisAsyncClient =
KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
ConfigsBuilder configsBuilder =
new ConfigsBuilder(streamName, appName, kinesisAsyncClient, dynamoClient, cloudWatchClient,
UUID.randomUUID().toString(), factory);
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
);
Thread schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
}
有两种方法可以解决这个问题。一、问题
默认情况下,KCL 配置为从 LATEST
开始读取流。此设置告诉流 reader 在“当前”时间戳获取流。
在您的情况下,您在该流中拥有“现在”之前放置在那里的数据。为了读取该数据,您可能需要考虑读取流中最早的数据。如果您设置默认流,该流将存储数据 24 小时。
要从该流的“开头”读取数据,或者在启动 KCL 应用程序前 24 小时,您需要将流 reader 设置为 TRIM_HORIZON
。此设置称为 initialPositionInStream
。你可以阅读它 here. There are three different settings documented in the API.
要解决您的问题,如第一个 link 中所述,首选方法是向属性文件添加一个条目。如果您不使用属性文件,您只需将其添加到您的 Scheduler
ctor:
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.initialPositionInStreamExtended(InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON))
.retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
);
此设置要记住的一件事是当流中有数据并且从 TRIM_HORIZON
开始时的启动功能。在这种情况下,RecordProcessor
将尽可能快地遍历记录。这可能会在 Kinesis API 甚至下游系统(无论您在 RecordProcessor 拥有数据后将数据发送到何处)造成性能问题,
我开始同时使用 KPL
和 KCL
在服务之间交换数据。但是每当 consumer service
离线时, KPL
发送的所有数据将永远丢失。所以我只得到那些在 consumer service
启动并且它的 shardConsumer
准备就绪时发送的数据块。我需要从最后消耗的点或以其他方式处理留下的数据。
这是我的 ShardProcessor
代码:
@Override
public void initialize(InitializationInput initializationInput) {
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
processRecordsInput.records()
.forEach(record -> {
//my logic
});
}
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
}
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
try {
shardEndedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
LOG.error("Kinesis error on Shard Ended", e);
}
}
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
try {
shutdownRequestedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
LOG.error("Kinesis error on Shutdown Requested", e);
}
}
和配置代码:
public void configure(String streamName, ShardRecordProcessorFactory factory) {
Region region = Region.of(awsRegion);
KinesisAsyncClient kinesisAsyncClient =
KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
ConfigsBuilder configsBuilder =
new ConfigsBuilder(streamName, appName, kinesisAsyncClient, dynamoClient, cloudWatchClient,
UUID.randomUUID().toString(), factory);
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
);
Thread schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
}
有两种方法可以解决这个问题。一、问题
默认情况下,KCL 配置为从 LATEST
开始读取流。此设置告诉流 reader 在“当前”时间戳获取流。
在您的情况下,您在该流中拥有“现在”之前放置在那里的数据。为了读取该数据,您可能需要考虑读取流中最早的数据。如果您设置默认流,该流将存储数据 24 小时。
要从该流的“开头”读取数据,或者在启动 KCL 应用程序前 24 小时,您需要将流 reader 设置为 TRIM_HORIZON
。此设置称为 initialPositionInStream
。你可以阅读它 here. There are three different settings documented in the API.
要解决您的问题,如第一个 link 中所述,首选方法是向属性文件添加一个条目。如果您不使用属性文件,您只需将其添加到您的 Scheduler
ctor:
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.initialPositionInStreamExtended(InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON))
.retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
);
此设置要记住的一件事是当流中有数据并且从 TRIM_HORIZON
开始时的启动功能。在这种情况下,RecordProcessor
将尽可能快地遍历记录。这可能会在 Kinesis API 甚至下游系统(无论您在 RecordProcessor 拥有数据后将数据发送到何处)造成性能问题,