使用 PutRecords 将多条记录加载到 Kinesis - 如果发生故障,如何仅重新发送失败的记录?
Loading multiple records to Kinesis using PutRecords - how to re-send only failed records in case of failure?
我正在使用 Lambda 将数据记录加载到 Kinesis 中,并且经常想要添加多达 500K 条记录,我将这些记录分成 500 条的块,并使用 Boto 的 put_records
方法将它们发送到 Kinesis。我有时会看到由于超过允许的吞吐量而导致的失败。
发生这种情况时重试的最佳方法是什么?理想情况下,我不希望数据流中有重复的消息,所以我不想简单地重新发送所有 500 条记录,但我正在努力了解如何只重试失败的消息。 put_records
方法的响应似乎不是很有用。
我可以相信响应记录列表的顺序与我传递给 putRecords 的列表的顺序相同吗?
我知道我可以增加分片的数量,但我想显着增加将数据加载到此 Kinesis 流的并行 Lambda 函数的数量。我们计划根据源系统对数据进行分区,我不能保证多个函数不会将数据写入同一个分片并超过允许的吞吐量。因此,我认为增加分片不会消除对重试策略的需求。
或者,有人知道 KPL 是否会自动为我处理这个问题吗?
Can I rely on the order of the response Records list being in the same order as the list I pass to putRecords?
是的。您将不得不依赖响应的顺序。响应记录的顺序与请求记录的顺序相同。
请检查 putrecords
回复,https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html。
Records:
An array of successfully and unsuccessfully processed record results, correlated with the request by natural ordering. A record that is successfully added to a stream includes SequenceNumber and ShardId in the result. A record that fails to be added to a stream includes ErrorCode and ErrorMessage in the result.
要重试失败的记录,您必须开发自己的重试机制。我在 python 中使用递归函数编写了重试机制,并按以下方式在重试之间进行增量等待。
import boto3
import time
kinesis_client = boto3.client('kinesis')
KINESIS_RETRY_COUNT = 10
KINESIS_RETRY_WAIT_IN_SEC = 0.1
KINESIS_STREAM_NAME = "your-kinesis-stream"
def send_to_stream(kinesis_records, retry_count):
put_response = kinesis_client.put_records(
Records=kinesis_records,
StreamName=KINESIS_STREAM_NAME
)
failed_count = put_response['FailedRecordCount']
if failed_count > 0:
if retry_count > 0:
retry_kinesis_records = []
for idx, record in enumerate(put_response['Records']):
if 'ErrorCode' in record:
retry_kinesis_records.append(kinesis_records[idx])
time.sleep(KINESIS_RETRY_WAIT_IN_SEC * (KINESIS_RETRY_COUNT - retry_count + 1))
send_to_stream(retry_kinesis_records, retry_count - 1)
else:
print(f'Not able to put records after retries. Records = {put_response["Records"]}')
在上面的示例中,您可以根据需要更改 KINESIS_RETRY_COUNT
和 KINESIS_RETRY_WAIT_IN_SEC
。此外,您还必须确保您的 lambda 超时足以重试。
Alternatively, does anybody know if KPL will automatically handle this
issue for me?
我不确定 KPL,但从文档来看它似乎有自己的重试机制。 https://docs.aws.amazon.com/streams/latest/dev/kinesis-producer-adv-retries-rate-limiting.html
虽然您绝对应该处理失败并重新发送它们,但将要重新发送的额外记录数量降至最低的一种方法是简单地发送 500 条记录,如果您要发送更多记录,请延迟 500 毫秒,然后再发送下一批.
每 500 条记录等待 500 毫秒会将您限制为 1000 records/sec,这是 Kinesis PutRecords 限制。保持在这个限制之下将最大限度地减少必须多次发送的记录数。
一次只处理较大列表中的 500 条记录也可以使重试逻辑更容易,因为任何失败的记录都可以简单地附加到主列表的末尾,循环时将在此处重试检查主列表中是否还有要发送给 Kinesis 的记录。
如果主列表在每次尝试发送 500 条记录时都没有变小,请记住检查以中止,如果每次至少有一条记录失败,就会发生这种情况。最终它将成为列表中的最后一个,并将永远不断地发送,除非进行此检查。
请注意,这适用于一个分片,如果您有更多分片,则可以相应地调整这些限制。
我正在使用 Lambda 将数据记录加载到 Kinesis 中,并且经常想要添加多达 500K 条记录,我将这些记录分成 500 条的块,并使用 Boto 的 put_records
方法将它们发送到 Kinesis。我有时会看到由于超过允许的吞吐量而导致的失败。
发生这种情况时重试的最佳方法是什么?理想情况下,我不希望数据流中有重复的消息,所以我不想简单地重新发送所有 500 条记录,但我正在努力了解如何只重试失败的消息。 put_records
方法的响应似乎不是很有用。
我可以相信响应记录列表的顺序与我传递给 putRecords 的列表的顺序相同吗?
我知道我可以增加分片的数量,但我想显着增加将数据加载到此 Kinesis 流的并行 Lambda 函数的数量。我们计划根据源系统对数据进行分区,我不能保证多个函数不会将数据写入同一个分片并超过允许的吞吐量。因此,我认为增加分片不会消除对重试策略的需求。
或者,有人知道 KPL 是否会自动为我处理这个问题吗?
Can I rely on the order of the response Records list being in the same order as the list I pass to putRecords?
是的。您将不得不依赖响应的顺序。响应记录的顺序与请求记录的顺序相同。
请检查 putrecords
回复,https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html。
Records: An array of successfully and unsuccessfully processed record results, correlated with the request by natural ordering. A record that is successfully added to a stream includes SequenceNumber and ShardId in the result. A record that fails to be added to a stream includes ErrorCode and ErrorMessage in the result.
要重试失败的记录,您必须开发自己的重试机制。我在 python 中使用递归函数编写了重试机制,并按以下方式在重试之间进行增量等待。
import boto3
import time
kinesis_client = boto3.client('kinesis')
KINESIS_RETRY_COUNT = 10
KINESIS_RETRY_WAIT_IN_SEC = 0.1
KINESIS_STREAM_NAME = "your-kinesis-stream"
def send_to_stream(kinesis_records, retry_count):
put_response = kinesis_client.put_records(
Records=kinesis_records,
StreamName=KINESIS_STREAM_NAME
)
failed_count = put_response['FailedRecordCount']
if failed_count > 0:
if retry_count > 0:
retry_kinesis_records = []
for idx, record in enumerate(put_response['Records']):
if 'ErrorCode' in record:
retry_kinesis_records.append(kinesis_records[idx])
time.sleep(KINESIS_RETRY_WAIT_IN_SEC * (KINESIS_RETRY_COUNT - retry_count + 1))
send_to_stream(retry_kinesis_records, retry_count - 1)
else:
print(f'Not able to put records after retries. Records = {put_response["Records"]}')
在上面的示例中,您可以根据需要更改 KINESIS_RETRY_COUNT
和 KINESIS_RETRY_WAIT_IN_SEC
。此外,您还必须确保您的 lambda 超时足以重试。
Alternatively, does anybody know if KPL will automatically handle this issue for me?
我不确定 KPL,但从文档来看它似乎有自己的重试机制。 https://docs.aws.amazon.com/streams/latest/dev/kinesis-producer-adv-retries-rate-limiting.html
虽然您绝对应该处理失败并重新发送它们,但将要重新发送的额外记录数量降至最低的一种方法是简单地发送 500 条记录,如果您要发送更多记录,请延迟 500 毫秒,然后再发送下一批.
每 500 条记录等待 500 毫秒会将您限制为 1000 records/sec,这是 Kinesis PutRecords 限制。保持在这个限制之下将最大限度地减少必须多次发送的记录数。
一次只处理较大列表中的 500 条记录也可以使重试逻辑更容易,因为任何失败的记录都可以简单地附加到主列表的末尾,循环时将在此处重试检查主列表中是否还有要发送给 Kinesis 的记录。
如果主列表在每次尝试发送 500 条记录时都没有变小,请记住检查以中止,如果每次至少有一条记录失败,就会发生这种情况。最终它将成为列表中的最后一个,并将永远不断地发送,除非进行此检查。
请注意,这适用于一个分片,如果您有更多分片,则可以相应地调整这些限制。