如何读取 Kinesis Data Stream 中最早的未处理记录
How to read the oldest unprocessed record in Kinesis Data Stream
我是 AWS 的新手,需要一些指导。
我想处理最早的未处理记录,但我似乎无法正确设置参数。
Current Architecture
对于分片迭代器:
- 我试过 TRIM_HORIZON 这给了我自
开始。
- 我也试过 LATEST,它只给了我一条最新记录。
不确定这些额外的细节是否有帮助,但是...
- 我正在通过 AWS 控制台上的 Lambda 放入我自己的记录
- 我正在通过查看 CloudWatch 中的日志文件对此进行调试
- 我正在通过分片迭代器获取记录(TRIM_HORIZON 和最新)
- 我的 getRecords 限制设置为 100
提前致谢!
没有 "oldest unprocessed record",因为 Kinesis 不知道您处理了什么(例如,您可能已经获取了记录但未对它们执行任何操作)。
如果您使用的是 Kinesis,我强烈 推荐使用 Kinesis Client Library, which has the concept of checkpoints - these are essentially a nice wrapper on top of ShardIterator AFTER_SEQUENCE_NUMBER,即 "oldest uncheckpointed record" - 或者尽可能接近"oldest unprocessed record"。
(你总是可以自己实现这个逻辑,但为什么不重用亚马逊已经为你完成的工作呢?)
我是 AWS 的新手,需要一些指导。
我想处理最早的未处理记录,但我似乎无法正确设置参数。
Current Architecture
对于分片迭代器:
- 我试过 TRIM_HORIZON 这给了我自 开始。
- 我也试过 LATEST,它只给了我一条最新记录。
不确定这些额外的细节是否有帮助,但是...
- 我正在通过 AWS 控制台上的 Lambda 放入我自己的记录
- 我正在通过查看 CloudWatch 中的日志文件对此进行调试
- 我正在通过分片迭代器获取记录(TRIM_HORIZON 和最新)
- 我的 getRecords 限制设置为 100
提前致谢!
没有 "oldest unprocessed record",因为 Kinesis 不知道您处理了什么(例如,您可能已经获取了记录但未对它们执行任何操作)。
如果您使用的是 Kinesis,我强烈 推荐使用 Kinesis Client Library, which has the concept of checkpoints - these are essentially a nice wrapper on top of ShardIterator AFTER_SEQUENCE_NUMBER,即 "oldest uncheckpointed record" - 或者尽可能接近"oldest unprocessed record"。
(你总是可以自己实现这个逻辑,但为什么不重用亚马逊已经为你完成的工作呢?)