Amazon Kinesis 和 AWS Lambda 重试
Amazon Kinesis & AWS Lambda Retries
我是 Amazon Kinesis 的新手,所以也许这只是我理解中的一个问题,但在 AWS Lambda FAQ 中它说:
The Amazon Kinesis and DynamoDB Streams records sent to your AWS Lambda function are strictly serialized, per shard. This means that if you put two records in the same shard, Lambda guarantees that your Lambda function will be successfully invoked with the first record before it is invoked with the second record. If the invocation for one record times out, is throttled, or encounters any other error, Lambda will retry until it succeeds (or the record reaches its 24-hour expiration) before moving on to the next record. The ordering of records across different shards is not guaranteed, and processing of each shard happens in parallel.
我的问题是,如果生产者出于某种原因将一些格式错误的数据放入分片,并且当 Lambda 函数拾取它时出错,然后不断重试,会发生什么情况?这意味着该特定分片的处理将被错误阻止 24 小时。
处理应用程序错误的最佳做法是将问题包装在自定义错误中,然后将此错误与所有成功处理的记录一起发送到下游并让使用者处理吗?当然,如果出现不可恢复的错误,导致程序像空指针一样崩溃,这仍然无济于事:我们将在接下来的 24 小时内再次回到阻塞重试循环。
别想太多,Kinesis只是一个队列。您必须成功使用一条记录(即从队列中弹出)才能继续下一条记录。就像一个先进先出堆栈。
适当的方法应该是:
- 从流中获取一条记录。
- 在 try-catch-finally 块中处理它。
- 如果记录处理成功,没有问题。 <-尝试
- 但是如果失败了,记下来到别的地方去调查
失败的原因。 <-抓住
- 并且在你的逻辑块的末尾,始终坚持这个位置
发电机。 <- 最后
- 如果您的系统出现内部问题(内存错误、硬件错误
等)那是另一回事了;因为它可能会影响处理所有的
记录,不止一个
顺便说一句,如果一条记录的处理时间超过 1 分钟,很明显您做错了什么。由于 Kinesis 旨在每秒处理数千条记录,因此您不应该为每条记录处理如此长的作业。
您问的问题是队列系统的一般问题,有时称为"poisonous message"。为了安全起见,您必须在业务逻辑中处理它们。
http://www.cogin.com/articles/SurvivingPoisonMessages.php#PoisonMessages
这是关于在 Kinesis 中处理事件的常见问题,我将尝试为您提供一些要点,以构建您的 Lambda 函数来处理 "corrupted" 数据的此类问题。由于最好的做法是将写入 Kinesis 流的系统部分与从 Kinesis 流读取的其他部分分开,因此您经常会遇到此类问题。
首先,为什么会出现这样的问题事件?
使用 Kinesis 处理您的事件是分解既进行前端处理(服务最终用户)又同时进行 time/code 后端处理(分析)的复杂系统的好方法事件),分为系统的两个 独立 部分。前端人员可以专注于他们的业务,而后端人员如果想添加功能来服务于他们的分析用例,则无需将代码更改推送到前端。 Kinesis 是一个事件缓冲区,它既打破了同步的需要,又简化了业务逻辑代码。
因此,我们希望写入流的事件在其“schema”中具有灵活性,如果前端团队希望更改事件格式,请添加字段、删除字段、更改协议或加密密钥,他们应该能够随心所欲地进行这些操作。
现在需要从流中读取的团队能够以有效的方式处理此类灵活的事件,并且不会在每次发生此类更改时中断他们的处理。因此,您的 Lambda 函数看到它无法处理的事件应该很常见,而“poison-pill”并不像您预期的那样罕见。
其次,你们是如何处理此类问题的?
您的 Lambda 函数将获得 批处理 事件进行处理。请注意,您不应该一个接一个地获取事件,而应该是大批量的事件。如果您的批次太小,您很快就会在流中遇到很大的滞后。
对于每个批次,您将迭代事件、处理它们,然后在 DynamoDB 中检查批次的最后一个序列 ID。 Lambda 自动执行其中大部分步骤(请在此处查看更多信息:http://docs.aws.amazon.com/lambda/latest/dg/walkthrough-kinesis-events-adminuser-create-test-function.html):
console.log('Loading function');
exports.handler = function(event, context) {
console.log(JSON.stringify(event, null, 2));
event.Records.forEach(function(record) {
// Kinesis data is base64 encoded so decode here
payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
console.log('Decoded payload:', payload);
});
context.succeed();
};
这就是 "happy path" 中发生的情况,如果所有事件都没有任何问题地处理的话。但是,如果您在批处理中遇到任何问题并且您没有“提交”具有成功通知的事件,则批处理将失败并且您将再次获取批处理中的所有事件。
现在您需要确定处理失败的原因是什么。
临时 问题(节流、网络问题...) - 稍等片刻并重试几次即可。在许多情况下,问题会自行解决。
偶尔问题(内存不足...) - 最好增加 Lambda 函数的内存分配或减小批处理大小。在许多情况下,此类修改将解决问题。
Constant 失败 - 这意味着您必须忽略有问题的事件(将其放入 DLQ - 死信队列)或修改您的代码来处理它。
问题是识别代码中的失败类型并以不同方式处理它。您需要以某种方式编写您的 Lambda 代码以识别它(例如异常类型)并做出不同的反应。
您可以使用与 CloudWatch 的集成将此类故障写入控制台并创建相关警报。您也可以使用 CloudWatch Logs 来记录您的 "dead-letter-queue" 并查看问题的根源。
我是 Amazon Kinesis 的新手,所以也许这只是我理解中的一个问题,但在 AWS Lambda FAQ 中它说:
The Amazon Kinesis and DynamoDB Streams records sent to your AWS Lambda function are strictly serialized, per shard. This means that if you put two records in the same shard, Lambda guarantees that your Lambda function will be successfully invoked with the first record before it is invoked with the second record. If the invocation for one record times out, is throttled, or encounters any other error, Lambda will retry until it succeeds (or the record reaches its 24-hour expiration) before moving on to the next record. The ordering of records across different shards is not guaranteed, and processing of each shard happens in parallel.
我的问题是,如果生产者出于某种原因将一些格式错误的数据放入分片,并且当 Lambda 函数拾取它时出错,然后不断重试,会发生什么情况?这意味着该特定分片的处理将被错误阻止 24 小时。
处理应用程序错误的最佳做法是将问题包装在自定义错误中,然后将此错误与所有成功处理的记录一起发送到下游并让使用者处理吗?当然,如果出现不可恢复的错误,导致程序像空指针一样崩溃,这仍然无济于事:我们将在接下来的 24 小时内再次回到阻塞重试循环。
别想太多,Kinesis只是一个队列。您必须成功使用一条记录(即从队列中弹出)才能继续下一条记录。就像一个先进先出堆栈。
适当的方法应该是:
- 从流中获取一条记录。
- 在 try-catch-finally 块中处理它。
- 如果记录处理成功,没有问题。 <-尝试
- 但是如果失败了,记下来到别的地方去调查 失败的原因。 <-抓住
- 并且在你的逻辑块的末尾,始终坚持这个位置 发电机。 <- 最后
- 如果您的系统出现内部问题(内存错误、硬件错误 等)那是另一回事了;因为它可能会影响处理所有的 记录,不止一个
顺便说一句,如果一条记录的处理时间超过 1 分钟,很明显您做错了什么。由于 Kinesis 旨在每秒处理数千条记录,因此您不应该为每条记录处理如此长的作业。
您问的问题是队列系统的一般问题,有时称为"poisonous message"。为了安全起见,您必须在业务逻辑中处理它们。
http://www.cogin.com/articles/SurvivingPoisonMessages.php#PoisonMessages
这是关于在 Kinesis 中处理事件的常见问题,我将尝试为您提供一些要点,以构建您的 Lambda 函数来处理 "corrupted" 数据的此类问题。由于最好的做法是将写入 Kinesis 流的系统部分与从 Kinesis 流读取的其他部分分开,因此您经常会遇到此类问题。
首先,为什么会出现这样的问题事件?
使用 Kinesis 处理您的事件是分解既进行前端处理(服务最终用户)又同时进行 time/code 后端处理(分析)的复杂系统的好方法事件),分为系统的两个 独立 部分。前端人员可以专注于他们的业务,而后端人员如果想添加功能来服务于他们的分析用例,则无需将代码更改推送到前端。 Kinesis 是一个事件缓冲区,它既打破了同步的需要,又简化了业务逻辑代码。
因此,我们希望写入流的事件在其“schema”中具有灵活性,如果前端团队希望更改事件格式,请添加字段、删除字段、更改协议或加密密钥,他们应该能够随心所欲地进行这些操作。
现在需要从流中读取的团队能够以有效的方式处理此类灵活的事件,并且不会在每次发生此类更改时中断他们的处理。因此,您的 Lambda 函数看到它无法处理的事件应该很常见,而“poison-pill”并不像您预期的那样罕见。
其次,你们是如何处理此类问题的?
您的 Lambda 函数将获得 批处理 事件进行处理。请注意,您不应该一个接一个地获取事件,而应该是大批量的事件。如果您的批次太小,您很快就会在流中遇到很大的滞后。
对于每个批次,您将迭代事件、处理它们,然后在 DynamoDB 中检查批次的最后一个序列 ID。 Lambda 自动执行其中大部分步骤(请在此处查看更多信息:http://docs.aws.amazon.com/lambda/latest/dg/walkthrough-kinesis-events-adminuser-create-test-function.html):
console.log('Loading function');
exports.handler = function(event, context) {
console.log(JSON.stringify(event, null, 2));
event.Records.forEach(function(record) {
// Kinesis data is base64 encoded so decode here
payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
console.log('Decoded payload:', payload);
});
context.succeed();
};
这就是 "happy path" 中发生的情况,如果所有事件都没有任何问题地处理的话。但是,如果您在批处理中遇到任何问题并且您没有“提交”具有成功通知的事件,则批处理将失败并且您将再次获取批处理中的所有事件。
现在您需要确定处理失败的原因是什么。
临时 问题(节流、网络问题...) - 稍等片刻并重试几次即可。在许多情况下,问题会自行解决。
偶尔问题(内存不足...) - 最好增加 Lambda 函数的内存分配或减小批处理大小。在许多情况下,此类修改将解决问题。
Constant 失败 - 这意味着您必须忽略有问题的事件(将其放入 DLQ - 死信队列)或修改您的代码来处理它。
问题是识别代码中的失败类型并以不同方式处理它。您需要以某种方式编写您的 Lambda 代码以识别它(例如异常类型)并做出不同的反应。
您可以使用与 CloudWatch 的集成将此类故障写入控制台并创建相关警报。您也可以使用 CloudWatch Logs 来记录您的 "dead-letter-queue" 并查看问题的根源。