AWS Kinesis - 如何从上一个检查点恢复消费
AWS Kinesis - how to resume consuming from last checkpoint
我正在使用 KCL (v2) 将 Kafka 消费者转换为 AWS Kinesis 消费者。在 Kafka 中,偏移量用于帮助消费者跟踪其最近消费的消息。如果我的 Kafka 应用死机,它将使用偏移量在重新启动时从它停止的地方开始消费。
然而,这在 Kinesis 中并不相同。我可以设置 kinesisClientLibConfiguration.withInitialPositionInStream(...)
,但唯一的参数是 TRIM_HORIZON
、LATEST
或 AT_TIMESTAMP
。如果我的 Kinesis 应用死机了,它将不知道在重新启动时从哪里恢复消费。
我的KCL消费者很简单。 main()
方法如下所示:
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("benTestApp",
"testStream", new DefaultAWSCredentialsProviderChain(), UUID.randomUUID().toString());
config.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
Worker worker = new Worker.Builder()
.recordProcessorFactory(new KCLRecordProcessorFactory())
.config(config)
.build();
而 RecordProcessor
是一个简单的实现:
@Override
public void initialize(InitializationInput initializationInput) {
LOGGER.info("Initializing record processor for shard: {}", initializationInput.getShardId());
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
List<Record> records = processRecordsInput.getRecords();
LOGGER.info("Retrieved {} records", records.size());
records.forEach(r -> LOGGER.info("Record: {}", StandardCharsets.UTF_8.decode(r.getData())));
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
LOGGER.info("Shutting down input");
}
如果我检查相应的 DynamoDB table,checkpoint
的值设置为 TRIM_HORIZON
,并且不会随着记录被消耗而使用 sequenceIds 进行更新。
确保我使用每条消息的解决方案是什么?
如@kdgregory 所述,KCL 要求用户设置自己的检查点。工作代码:
@Override
public void initialize(InitializationInput initializationInput) {
LOGGER.info("Initializing record processor for shard: {}", initializationInput.getShardId());
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
List<Record> records = processRecordsInput.getRecords();
LOGGER.info("Retrieved {} records", records.size());
records.forEach(r -> LOGGER.info("Record with sequenceId {} at date {} : {}", r.getSequenceNumber(),
r.getApproximateArrivalTimestamp(), StandardCharsets.UTF_8.decode(r.getData())));
try {
processRecordsInput.getCheckpointer().checkpoint();
} catch (InvalidStateException | ShutdownException e) {
LOGGER.error("Unable to checkpoint");
}
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
LOGGER.info("Shutting down input");
try {
shutdownInput.getCheckpointer().checkpoint();
} catch (InvalidStateException | ShutdownException e) {
LOGGER.error("Unable to checkpoint");
}
}
我正在使用 KCL (v2) 将 Kafka 消费者转换为 AWS Kinesis 消费者。在 Kafka 中,偏移量用于帮助消费者跟踪其最近消费的消息。如果我的 Kafka 应用死机,它将使用偏移量在重新启动时从它停止的地方开始消费。
然而,这在 Kinesis 中并不相同。我可以设置 kinesisClientLibConfiguration.withInitialPositionInStream(...)
,但唯一的参数是 TRIM_HORIZON
、LATEST
或 AT_TIMESTAMP
。如果我的 Kinesis 应用死机了,它将不知道在重新启动时从哪里恢复消费。
我的KCL消费者很简单。 main()
方法如下所示:
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("benTestApp",
"testStream", new DefaultAWSCredentialsProviderChain(), UUID.randomUUID().toString());
config.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
Worker worker = new Worker.Builder()
.recordProcessorFactory(new KCLRecordProcessorFactory())
.config(config)
.build();
而 RecordProcessor
是一个简单的实现:
@Override
public void initialize(InitializationInput initializationInput) {
LOGGER.info("Initializing record processor for shard: {}", initializationInput.getShardId());
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
List<Record> records = processRecordsInput.getRecords();
LOGGER.info("Retrieved {} records", records.size());
records.forEach(r -> LOGGER.info("Record: {}", StandardCharsets.UTF_8.decode(r.getData())));
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
LOGGER.info("Shutting down input");
}
如果我检查相应的 DynamoDB table,checkpoint
的值设置为 TRIM_HORIZON
,并且不会随着记录被消耗而使用 sequenceIds 进行更新。
确保我使用每条消息的解决方案是什么?
如@kdgregory 所述,KCL 要求用户设置自己的检查点。工作代码:
@Override
public void initialize(InitializationInput initializationInput) {
LOGGER.info("Initializing record processor for shard: {}", initializationInput.getShardId());
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
List<Record> records = processRecordsInput.getRecords();
LOGGER.info("Retrieved {} records", records.size());
records.forEach(r -> LOGGER.info("Record with sequenceId {} at date {} : {}", r.getSequenceNumber(),
r.getApproximateArrivalTimestamp(), StandardCharsets.UTF_8.decode(r.getData())));
try {
processRecordsInput.getCheckpointer().checkpoint();
} catch (InvalidStateException | ShutdownException e) {
LOGGER.error("Unable to checkpoint");
}
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
LOGGER.info("Shutting down input");
try {
shutdownInput.getCheckpointer().checkpoint();
} catch (InvalidStateException | ShutdownException e) {
LOGGER.error("Unable to checkpoint");
}
}