在 AWS Kinesis 中,如果我们使用 stale/expired SequenceNumber 调用 GetShardIterator 会发生什么?
In AWS Kinesis, what happens if we call GetShardIterator with a stale/expired SequenceNumber?
通常情况下,我们调用GetShardIterator最后读取记录的SequenceNumber(如果我们之前的ShardIterator已经过期)。
假定 SequenceNumber 属于保留期(即默认 24 小时)内的有效记录。
但是如果它在 Kinesis 保留期之外(即 25 小时前)怎么办?那么 Record/SequenceNumber 就会从流中删除。
GetShardIterator会抛出异常吗? What kind of exception?还是会return没有记录?
这对我来说很有趣,所以我试了一下。
TL;DR:它按我的预期工作:以超过 trim 范围的序列号开始等同于从 trim 范围开始。
为了测试,昨天早上我在专用流上发布了一条记录:
aws kinesis put-record --stream-name test-expiration --partition-key irrelevant --data "this is a test"
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49616057638370363251266361760650016619879524195517857794"
}
然后我等了将近24小时(幸好我今天早上没决定睡觉),运行 a utility that I wrote验证记录还在流中:
> kinesis_reader.py test-expiration TRIM_HORIZON 1
{"SequenceNumber": "49616057638370363251266361760650016619879524195517857794", "ApproximateArrivalTimestamp": "2021-03-04T11:33:13.254000+00:00", "Data": "this is a test", "PartitionKey": "irrelevant"}
最后,我从该实用程序中取出代码,将其放入 Jupyter 笔记本中,并在记录在流中超过 24 小时后执行它:
检索分片迭代器:
client = boto3.client('kinesis')
stream_name = "test-expiration"
shard_id = "shardId-000000000000"
sequence_number ="49616057638370363251266361760650016619879524195517857794"
resp = client.get_shard_iterator(StreamName=stream_name, ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER', StartingSequenceNumber=sequence_number)
shard_itx = resp['ShardIterator']
这 returned 一个迭代器(我将省略它,因为它有很多不透明的文本)。它想知道它是否会抛出,但没有记录与陈旧迭代器对应的异常。
使用此迭代器检索记录:
client.get_records(ShardIterator=shard_itx)
{'Records': [],
'NextShardIterator': 'AAAAAAAAAAE8Pi3/Ykdggje538B61BxObso1tCZAK4MJIGMc//IGiqJlNdUz2PgTGXhMAW3GLJIFSsaSmWW72Y2qBuwk8+WvKse0Al8DhjBNUmCdB5T/FbUa/67NeUjgSsktcke3ZiCs+rnHXFkAv08rR8egQsJCDmcHkELeEKTaa5pnlMB9kUDB+NT+yFCO7oFNaDdz4OUSH094IN0+Y/w6n5K+XTLsVvhPmM6pYdTv2xllzJJnTA==',
'MillisBehindLatest': 44741000,
'ResponseMetadata': {'RequestId': 'fd58bcf1-6596-0186-a5e4-a7359063274d',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amzn-requestid': 'fd58bcf1-6596-0186-a5e4-a7359063274d',
'x-amz-id-2': 'jK9tGfx5eSyi5ysHhnANVn0IvJrwWwYzbxRGTRyFnk1OgjfQ+D2KtzqfF3FXVg5wwBH0m/QBoXdwJ+cEQSeBCktkKgFWOUx5',
'date': 'Fri, 05 Mar 2021 11:44:04 GMT',
'content-type': 'application/x-amz-json-1.1',
'content-length': '315'},
'RetryAttempts': 0}}
如您所见,响应中没有任何记录。
令人惊讶的是,它只表明我比我今天早上添加的最新记录晚了 44741000 毫秒。我本以为会接近 8640000 毫秒(一天)。
作为最后的实验,我写了一个循环来计算我必须读取流多少次才能找到我今天早上放在流上的记录(到现在为止,已经有半小时了) :
count = 0
while True:
count += 1
resp = client.get_records(ShardIterator=shard_itx)
print(f"{count}: {resp['MillisBehindLatest']} millis behind latest")
if resp['Records']:
print(resp)
break
shard_itx = resp['NextShardIterator']
答案:99 次读取,分片迭代器每次前进大约 500 秒。
我打算将此流保留一段时间:我想看看 Kinesis 是否会更新其内部指针,以便后续请求 return 一个更接近当前时间的分片迭代器。
更新
我 运行 再次通过此代码,比第一次尝试晚了大约一个小时。当我使用迭代器检索记录时,它 错误地 告诉我我比最新的晚了 0 毫秒。随后的检索(使用第一个迭代器)报告了 49915000。
道德:不要依赖 MillisBehindLatest
除非你一直在积极处理记录。
通常情况下,我们调用GetShardIterator最后读取记录的SequenceNumber(如果我们之前的ShardIterator已经过期)。
假定 SequenceNumber 属于保留期(即默认 24 小时)内的有效记录。
但是如果它在 Kinesis 保留期之外(即 25 小时前)怎么办?那么 Record/SequenceNumber 就会从流中删除。
GetShardIterator会抛出异常吗? What kind of exception?还是会return没有记录?
这对我来说很有趣,所以我试了一下。
TL;DR:它按我的预期工作:以超过 trim 范围的序列号开始等同于从 trim 范围开始。
为了测试,昨天早上我在专用流上发布了一条记录:
aws kinesis put-record --stream-name test-expiration --partition-key irrelevant --data "this is a test"
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49616057638370363251266361760650016619879524195517857794"
}
然后我等了将近24小时(幸好我今天早上没决定睡觉),运行 a utility that I wrote验证记录还在流中:
> kinesis_reader.py test-expiration TRIM_HORIZON 1
{"SequenceNumber": "49616057638370363251266361760650016619879524195517857794", "ApproximateArrivalTimestamp": "2021-03-04T11:33:13.254000+00:00", "Data": "this is a test", "PartitionKey": "irrelevant"}
最后,我从该实用程序中取出代码,将其放入 Jupyter 笔记本中,并在记录在流中超过 24 小时后执行它:
检索分片迭代器:
client = boto3.client('kinesis') stream_name = "test-expiration" shard_id = "shardId-000000000000" sequence_number ="49616057638370363251266361760650016619879524195517857794" resp = client.get_shard_iterator(StreamName=stream_name, ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER', StartingSequenceNumber=sequence_number) shard_itx = resp['ShardIterator']
这 returned 一个迭代器(我将省略它,因为它有很多不透明的文本)。它想知道它是否会抛出,但没有记录与陈旧迭代器对应的异常。
使用此迭代器检索记录:
client.get_records(ShardIterator=shard_itx)
{'Records': [], 'NextShardIterator': 'AAAAAAAAAAE8Pi3/Ykdggje538B61BxObso1tCZAK4MJIGMc//IGiqJlNdUz2PgTGXhMAW3GLJIFSsaSmWW72Y2qBuwk8+WvKse0Al8DhjBNUmCdB5T/FbUa/67NeUjgSsktcke3ZiCs+rnHXFkAv08rR8egQsJCDmcHkELeEKTaa5pnlMB9kUDB+NT+yFCO7oFNaDdz4OUSH094IN0+Y/w6n5K+XTLsVvhPmM6pYdTv2xllzJJnTA==', 'MillisBehindLatest': 44741000, 'ResponseMetadata': {'RequestId': 'fd58bcf1-6596-0186-a5e4-a7359063274d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fd58bcf1-6596-0186-a5e4-a7359063274d', 'x-amz-id-2': 'jK9tGfx5eSyi5ysHhnANVn0IvJrwWwYzbxRGTRyFnk1OgjfQ+D2KtzqfF3FXVg5wwBH0m/QBoXdwJ+cEQSeBCktkKgFWOUx5', 'date': 'Fri, 05 Mar 2021 11:44:04 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '315'}, 'RetryAttempts': 0}}
如您所见,响应中没有任何记录。
令人惊讶的是,它只表明我比我今天早上添加的最新记录晚了 44741000 毫秒。我本以为会接近 8640000 毫秒(一天)。
作为最后的实验,我写了一个循环来计算我必须读取流多少次才能找到我今天早上放在流上的记录(到现在为止,已经有半小时了) :
count = 0
while True:
count += 1
resp = client.get_records(ShardIterator=shard_itx)
print(f"{count}: {resp['MillisBehindLatest']} millis behind latest")
if resp['Records']:
print(resp)
break
shard_itx = resp['NextShardIterator']
答案:99 次读取,分片迭代器每次前进大约 500 秒。
我打算将此流保留一段时间:我想看看 Kinesis 是否会更新其内部指针,以便后续请求 return 一个更接近当前时间的分片迭代器。
更新
我 运行 再次通过此代码,比第一次尝试晚了大约一个小时。当我使用迭代器检索记录时,它 错误地 告诉我我比最新的晚了 0 毫秒。随后的检索(使用第一个迭代器)报告了 49915000。
道德:不要依赖 MillisBehindLatest
除非你一直在积极处理记录。