为什么关闭分片时 Amazon Kinesis 流上需要检查点?
Why checkpoint is needed on an Amazon Kinesis stream when shutting down a shard?
将一个分片拆分为 2 个子分片时,父分片关闭。当发生这种情况时,预计记录处理器(此处使用 KCL)会检查点,如下面的 KCL 源代码所示:
try {
recordProcessor.shutdown(recordProcessorCheckpointer, reason);
String lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
if (reason == ShutdownReason.TERMINATE) {
if ((lastCheckpointValue == null)
|| (!lastCheckpointValue.equals(SentinelCheckpoint.SHARD_END.toString()))) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ shardInfo.getShardId());
}
}
问题是:
这个关卡必不可少吗?
如果记录处理器不检查点并吸收异常会怎样?
我问的原因是因为在我的用例中我想确保流中的每条记录都已处理到 s3,现在如果分片关闭,可能还有尚未刷新的项目因此我想确保他们会对子碎片的新 consumer/worker 感到不满?
如果我checkpoint他们也不会反感。
有什么想法吗?
提前致谢。
物品不会在碎片之间移动。重新分片后,新记录放入新分片,但旧记录永远不会从父分片转移,也不再向(现已关闭的)父分片添加新记录。即使在关闭后,数据也会在其正常的 24 小时生命周期内保留在父分片中。您的记录处理器只有在到达父分片数据的末尾后才会关闭。
http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-sdk-java-after-resharding.html
顺便说一句,您可能知道 SDK API 很困难,而且客户端库也好不到哪儿去。试试连接器库,它要好得多 API 并且包括一个 S3 归档应用程序示例。
将一个分片拆分为 2 个子分片时,父分片关闭。当发生这种情况时,预计记录处理器(此处使用 KCL)会检查点,如下面的 KCL 源代码所示:
try {
recordProcessor.shutdown(recordProcessorCheckpointer, reason);
String lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
if (reason == ShutdownReason.TERMINATE) {
if ((lastCheckpointValue == null)
|| (!lastCheckpointValue.equals(SentinelCheckpoint.SHARD_END.toString()))) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ shardInfo.getShardId());
}
}
问题是:
这个关卡必不可少吗?
如果记录处理器不检查点并吸收异常会怎样?
我问的原因是因为在我的用例中我想确保流中的每条记录都已处理到 s3,现在如果分片关闭,可能还有尚未刷新的项目因此我想确保他们会对子碎片的新 consumer/worker 感到不满?
如果我checkpoint他们也不会反感。
有什么想法吗?
提前致谢。
物品不会在碎片之间移动。重新分片后,新记录放入新分片,但旧记录永远不会从父分片转移,也不再向(现已关闭的)父分片添加新记录。即使在关闭后,数据也会在其正常的 24 小时生命周期内保留在父分片中。您的记录处理器只有在到达父分片数据的末尾后才会关闭。
http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-sdk-java-after-resharding.html
顺便说一句,您可能知道 SDK API 很困难,而且客户端库也好不到哪儿去。试试连接器库,它要好得多 API 并且包括一个 S3 归档应用程序示例。