Boto3 get_records:输入中缺少必需的参数:"ShardId"

Boto3 get_records: Missing required parameter in input: "ShardId"

我正在使用 Boto3 和 Kinesis。我希望 Kinesis return 流中的单个项目。

这是我的代码:

kinesis = boto3.client('kinesis')
iterator = kinesis.get_shard_iterator(
     StreamName = "requests",
     ShardIteratorType='LATEST'
)

response = kinesis.get_records(
     ShardIterator = iterator,
     Limit = 1,
     StreamName = "requests",
)

print(response)

失败并出现此错误:

botocore.exceptions.ParamValidationError: Parameter validation failed:
Missing required parameter in input: "ShardId"

在我初始化迭代器的那一行,所以我知道我缺少一个参数,但是我应该将 ShardId 设置为什么?如果答案因流而异,那么 ShardId 到底是什么?

提前致谢!

核心问题是get_shard_iterator需要传递给它的ShardId参数。


分片 ID 是流中分片的唯一标识符。

要为您的流获取 ShardId(或 ShardId 以供选择),请先调用 describe_stream

stream = client.describe_stream(StreamName='requests')

这将 return 给你一个响应对象,其中包含一个 JsonPath 为 response.StreamDescription.Shards 的流对象列表。

此数组中的每个对象都有一个 ShardId。根据您配置 Kinesis 流的方式,您将在此数组中拥有 1 个或多个分片对象。

如果你有 1 个分片,下面应该允许你选择第一个分片 ID 并继续你的代码的其余部分:

kinesis = boto3.client('kinesis')

stream = client.describe_stream(StreamName='requests')
shard_id = stream['StreamDescription']['Shards'][0]['ShardId']

iterator = kinesis.get_shard_iterator(
     StreamName = "requests",
     ShardIteratorType='LATEST',
     ShardId = shard_id
)

response = kinesis.get_records(
     ShardIterator = iterator,
     Limit = 1
)

print(response)

要了解有关分片本身的更多信息(超出范围),请先阅读这篇精彩的文章 by John Rotenstein