如何使用 lambda 函数 (A) 的 n 个结果调用 Python 中另一个 lambda 处理器函数 (B) 的 n 个并发实例?

How to use n results from a lambda function (A) to invoke n concurrent instances of another lambda processor function (B) in Python?

我有一个 AWS Lambda 函数 (A) returns n urls。我想将每个 url 作为参数单独并同时传递给另一个 AWS Lambda 函数 (B)。函数 B 然后处理传递的 url 和 returns 结果。这两个函数都是用 Python 编写的,如果可能的话,我宁愿避免使用其他语言。有没有人有一个明确的解决方案来解决超时、并发违规和其他边缘情况 and/or 错误?

即使分配了最大内存,函数 A 也需要约 85 秒才能设置有效负载并调用函数 B 1,100 次。调用另一个 AWS Lambda 函数通常需要大约 80 毫秒吗?有没有更快的方法?此外,函数 B 的 CloudWatch Logs 将多个日志流中的调用分开,这使得很难在一个地方看到所有调用以确认事情是否正确完成 and/or 以什么顺序 and/or 其中任何 errors/delays 可能位于。

我看过 boto3.client('lambda') docs

我还利用 and 获取我现有的代码。

这是我用于测试的代码。

# Function A - using max Memory setting (3008 MB currently) to speed things up

import boto3
import json

def lambda_handler(event, context):
    #simulate 1,100 urls (more than the default concurrency limit of 1,000)
    n = 1100
    results = range(1, n+1)
    #invoke function B asynchronously
    for result in results:
        payload = {'url' : result}
        boto3.client('lambda').invoke(FunctionName='B', InvocationType='Event', Payload=json.dumps(payload))
    return{'statusCode': 200, 'body': json.dumps('Hello from Lambda!')}
# Function B - using the min Memory setting (128 MB currently)

import json
import time

def lambda_handler(event, context):
    #wait 5 seconds to simulate processing time
    time.sleep(5)
    #process passed payload from function A
    print(event['url'])
    return{'statusCode': 200, 'body': json.dumps('Bye from Lambda!')}


Is ~80ms typical to invoke another AWS Lambda function?

我觉得这听起来还不错,但可能还有一些改进的余地。在查看您的代码时,我突然想到的一件事是您一遍又一遍地创建 AWS Lambda 客户端对象。尝试创建一次客户端,如下所示:

client = boto3.client('lambda')
for result in results:
        payload = {'url' : result}
        client.invoke(FunctionName='B', InvocationType='Event', Payload=json.dumps(payload))

通过重用相同的客户端对象,我认为由于重用了与 AWS API 服务器的底层 HTTP 连接,您会看到性能提升。

Additionally, the CloudWatch Logs for function B separate the invocations among multiple log streams making it hard to see all the invocations in 1 place to confirm if things were done properly and/or in what order and/or where any errors/delays may be located.

您在多个服务器上处理超过一千个异步进程运行。在一个地方查看所有这些日志将是一个挑战。您可能会考虑使用 CloudWatch Logs Insights.

之类的东西

Does anyone have a definitive solution that accounts for timeouts, concurrency violations, other edge cases and/or errors?

管理超时、并发限制和其他错误的典型模式是将所有事件发送到 SQS 队列和 let the queue trigger your second Lambda function。然而,虽然您的第一个 Lambda 函数将像现在一样快地完成,或者可能更快,

另一种可用于解决其中一些问题的模式是在您的第一个 Lambda 函数中实施 exponential backoff algorithm。但是,这将需要您的函数代码直接处理重试,而不是依靠 SQS 等其他 AWS 服务为您处理重试,并且需要在您的 Lambda 函数中添加暂停,这可能会导致第一个函数调用最终在它之前超时已成功触发所有第二个函数调用,这只会创建另一个您必须以某种方式处理的错误情况。