AWS Kinesis - 如何从上一个检查点恢复消费

AWS Kinesis - how to resume consuming from last checkpoint

我正在使用 KCL (v2) 将 Kafka 消费者转换为 AWS Kinesis 消费者。在 Kafka 中,偏移量用于帮助消费者跟踪其最近消费的消息。如果我的 Kafka 应用死机,它将使用偏移量在重新启动时从它停止的地方开始消费。

然而,这在 Kinesis 中并不相同。我可以设置 kinesisClientLibConfiguration.withInitialPositionInStream(...),但唯一的参数是 TRIM_HORIZONLATESTAT_TIMESTAMP。如果我的 Kinesis 应用死机了,它将不知道在重新启动时从哪里恢复消费。

我的KCL消费者很简单。 main() 方法如下所示:

KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("benTestApp",
            "testStream", new DefaultAWSCredentialsProviderChain(), UUID.randomUUID().toString());
config.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

Worker worker = new Worker.Builder()
            .recordProcessorFactory(new KCLRecordProcessorFactory())
            .config(config)
            .build();

RecordProcessor 是一个简单的实现:

@Override
public void initialize(InitializationInput initializationInput) {
    LOGGER.info("Initializing record processor for shard: {}", initializationInput.getShardId());
}

@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
    List<Record> records = processRecordsInput.getRecords();
    LOGGER.info("Retrieved {} records", records.size());
    records.forEach(r -> LOGGER.info("Record: {}", StandardCharsets.UTF_8.decode(r.getData())));
}

@Override
public void shutdown(ShutdownInput shutdownInput) {
    LOGGER.info("Shutting down input");
}

如果我检查相应的 DynamoDB table,checkpoint 的值设置为 TRIM_HORIZON,并且不会随着记录被消耗而使用 sequenceIds 进行更新。

确保我使用每条消息的解决方案是什么?

如@kdgregory 所述,KCL 要求用户设置自己的检查点。工作代码:

@Override
public void initialize(InitializationInput initializationInput) {
    LOGGER.info("Initializing record processor for shard: {}", initializationInput.getShardId());
}

@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
    List<Record> records = processRecordsInput.getRecords();
    LOGGER.info("Retrieved {} records", records.size());
    records.forEach(r -> LOGGER.info("Record with sequenceId {} at date {} : {}", r.getSequenceNumber(),
            r.getApproximateArrivalTimestamp(), StandardCharsets.UTF_8.decode(r.getData())));
    try {
        processRecordsInput.getCheckpointer().checkpoint();
    } catch (InvalidStateException | ShutdownException e) {
        LOGGER.error("Unable to checkpoint");
    }
}

@Override
public void shutdown(ShutdownInput shutdownInput) {
    LOGGER.info("Shutting down input");
    try {
        shutdownInput.getCheckpointer().checkpoint();
    } catch (InvalidStateException | ShutdownException e) {
        LOGGER.error("Unable to checkpoint");
    }
}