AWS step-function mapState 遍历大负载

AWS step-function mapState iterate over large payloads

我有一个状态机,由第一个预处理任务组成,该任务生成一个数组作为输出,后续映射状态使用该数组进行循环。第一个任务的输出数组变得太大,状态机抛出错误 States.DataLimitExceeded: The state/task 'arn:aws:lambda:XYZ' returned a result with a size exceeding the maximum number of characters service limit.

这是状态机 yaml 的示例:

stateMachines:
  myStateMachine:
    name: "myStateMachine"
    definition:
      StartAt: preProcess
      States:
        preProcess:
          Type: Task
          Resource:
            Fn::GetAtt: [preProcessLambda, Arn]
          Next: mapState
          ResultPath: "$.preProcessOutput"
        mapState:
          Type: Map
          ItemsPath: "$.preProcessOutput.data"
          MaxConcurrency: 100
          Iterator:
            StartAt: doMap
            States:
              doMap:
                Type: Task
                Resource:
                  Fn::GetAtt: [doMapLambda, Arn]
                End: true
          Next: ### next steps, not relevant

我想到的一个可能的解决方案是状态 preProcess 将其输出保存在 S3 存储桶中,状态 mapState 直接从中读取。这可能吗?目前 preProcess 的输出是

ResultPath: "$.preProcessOutput"

mapState取数组

ItemsPath: "$.preProcessOutput.data" 作为输入。

我需要如何调整地图状态直接从 S3 读取的 yaml?

我认为目前无法直接从S3 读取。您可以尝试做一些事情来绕过此限制。一种是制作您自己的迭代器而不是使用 Map State。另一个如下:

让 lambda 读取您的 s3 文件并按索引或某些 id/key 将其分块。这一步背后的想法是通过 Map State 中的迭代器传递一个更小的有效载荷。假设您的数据具有以下结构。

[ { idx: 1, ...more keys }, {idx: 2, ...more keys }, { idx: 3, ...more keys }, ... 4,997 more objects of data ]

假设您希望迭代器一次处理 1,000 行。 Return 以下元组表示您的 lambda 中的索引:[ [ 0, 999 ], [ 1000, 1999 ], [ 2000, 2999 ], [ 3000, 3999 ], [ 4000, 4999] ]

您的地图状态将接收此新数据结构,并且每次迭代都将是元组之一。迭代 #1:[ 0, 999 ],迭代 #2:[ 1000, 1999 ],等等

在您的迭代器中,调用一个使用元组索引查询您的 S3 文件的 lambda。 AWS 在 S3 存储桶上有一种名为 Amazon S3 Select 的查询语言:https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-glacier-select-sql-reference-select.html

这是另一个关于如何使用 S3 select 并使用节点将数据置于可读状态的重要资源:https://thetrevorharmon.com/blog/how-to-use-s3-select-to-query-json-in-node-js

因此,对于迭代 #1,我们查询数据结构中的前 1,000 个对象。我现在可以调用我通常在迭代器中拥有的任何函数。

这种方法的关键是 inputPath 永远不会接收大型数据结构。

我目前也在解决类似的问题。由于阶跃函数存储其整个状态,因此随着 json 映射到所有值而增长,您很快就会遇到问题。

解决这个问题的唯一真正方法是使用阶梯函数的层次结构。也就是说,您的步骤功能上的步骤功能。所以你有:

parent -> [batch1, batch2, batch...N]

然后每个batch有多个single jobs:

batch1 -> [j1,j2,j3...jBATCHSIZE]

我有一个非常简单的步进函数,我发现 ~4k 大约是在我开始达到状态限制之前我可以拥有的最大批量大小。

不是一个很好的解决方案,嘿,它有效。

截至 2020 年 9 月,步函数的限制已增加 8 倍

https://aws.amazon.com/about-aws/whats-new/2020/09/aws-step-functions-increases-payload-size-to-256kb/

也许现在它符合您的要求

写这篇文章以防其他人遇到这个问题 - 我最近也不得不在工作中解决这个问题。我找到了我认为是一个相对简单的解决方案,没有使用第二步功能。

我正在为此使用 Python,并将在 Python 中提供一些示例,但该解决方案应该适用于任何语言。

假设预处理输出如下所示:

[
    {Output_1},
    {Output_2},
    .
    .
    .
    {Output_n}
]

而Step Function部分的简化版定义如下:

"PreProcess": {
    "Type": "Task",
    "Resource": "Your Resource ARN",
    "Next": "Map State"
},
"Map State": {
    Do a bunch of stuff
}

处理 PreProcess 输出超过 Step Functions 负载的情况:

  1. 在 PreProcess 内部,将输出批处理成小到不超过有效载荷的块。

    这是最复杂的一步。您将需要进行一些试验以找到单个批次的最大尺寸。一旦你有了数字(让这个数字动态化可能很聪明),我使用 numpy 将原始的 PreProcess 输出拆分为批次数。

    import numpy as np
    batches = np.array_split(original_pre_process_output, number_of_batches)
    
  2. 再次在 PreProcess 中,将每个批次上传到 Amazon S3,将密钥保存在新列表中。此 S3 密钥列表将成为新的 PreProcess 输出。

    在 Python 中,看起来像这样:

    import json
    import boto3
    
    s3 = boto3.resource('s3')
    
    batch_keys = []
    for batch in batches:
        s3_batch_key = 'Your S3 Key here'
        s3.Bucket(YOUR_BUCKET).put_object(Key=s3_batch_key, Body=json.dumps(batch))
        batch_keys.append({'batch_key': s3_batch_key})
    

    在我实施的解决方案中,我使用 for batch_id, batch in enumerate(batches) 轻松地为每个 S3 密钥指定了自己的 ID。

  3. 将 'Inner' 地图状态包装在 'Outer' 地图状态中,并在外部地图中创建 Lambda 函数以将批次提供给内部地图。

    现在我们有一个由 S3 密钥组成的小输出,我们需要一种方法来一次打开一个,将每批输入原始(现在 'Inner')映射状态。

    为此,首先创建一个新的 Lambda 函数 - 这将代表 BatchJobs 状态。接下来,将初始 Map 状态包装在 Outer map 中,如下所示:

    "PreProcess": {
    "Type": "Task",
    "Resource": "Your Resource ARN",
    "Next": "Outer Map"
    },
    "Outer Map": {
        "Type": "Map",
        "MaxConcurrency": 1,
        "Next": "Original 'Next' used in the Inner map",
        "Iterator": {
            "StartAt": "BatchJobs",
            "States": {
                "BatchJobs": {
                    "Type": "Task",
                    "Resource": "Newly created Lambda Function ARN",
                    "Next": "Inner Map"   
                },
                "Inner Map": {
                     Initial Map State, left as is.
                }
            }
        }
    }
    

    请注意外部映射中的 'MaxConcurrency' 参数 - 这只是确保按顺序执行批处理。

    使用这个新的 Step Function 定义,BatchJobs 状态将收到每个批次的 {'batch_key': s3_batch_key}。然后 BatchJobs 状态只需要获取存储在键中的对象,并将其传递给 Inner Map。

    在 Python 中,BatchJobs Lambda 函数如下所示:

    import json
    import boto3
    
    s3 = boto3.client('s3')
    
    def batch_jobs_handler(event, context):
        return json.loads(s3.get_object(Bucket='YOUR_BUCKET_HERE',
                                        Key=event.get('batch_key'))['Body'].read().decode('utf-8'))
    
  4. 更新您的工作流程以处理输出的新结构。

    在实施此解决方案之前,您的 Map 状态输出一个输出数组:

    [
        {Map_output_1},
        {Map_output_2},
        .
        .
        .
        {Map_output_n}
    ]
    

    使用此解决方案,您现在将获得一个列表列表,每个内部列表包含每个批次的结果:

    [
        [
            {Batch_1_output_1},
            {Batch_1_output_2},
            .
            .
            .
            {Batch_1_output_n}
        ],
        [
            {Batch_2_output_1},
            {Batch_2_output_2},
            .
            .
            .
            {Batch_2_output_n}
        ],
        .
        .
        .
        [
            {Batch_n_output_1},
            {Batch_n_output_2},
            .
            .
            .
            {Batch_n_output_n}
        ]
    ]
    

    根据您的需要,您可能需要在 Map 之后调整一些代码以处理新的输出格式。

就是这样!只要您正确设置了最大批处理大小,达到有效负载限制的唯一方法就是您的 S3 密钥列表超过有效负载限制。

建议的解决方法适用于特定场景,但处理正常有效负载会生成大量可能超过有效负载限制的项目列表。

在一般情况下,我认为问题可以在场景 1->N 中重复出现。我的意思是当一个步骤可能会在工作流程中生成许多步骤执行时。

打破某些任务复杂性的一种明确方法是将其分成许多其他任务,因此很可能需要很多次。同样从可伸缩性的角度来看,这样做有一个明显的优势,因为将大计算分解成小计算的次数越多,粒度就越大,可以进行更多的并行处理和优化。

这就是 AWS increasing the max payload size 的目的。他们称之为 动态并行

问题是 Map 状态是那个的 corner-stone。除了服务集成(数据库查询等)之外,它是唯一可以从一个步骤中动态派生出许多任务的集成。但是似乎没有办法指定有效负载在文件上。

我看到一个快速解决问题的方法是,如果他们在每个步骤中添加一个可选的持久性规范,例如:

stateMachines:
  myStateMachine:
    name: "myStateMachine"
    definition:
      StartAt: preProcess
      States:
        preProcess:
          Type: Task
          Resource:
            Fn::GetAtt: [preProcessLambda, Arn]
          Next: mapState
          ResultPath: "$.preProcessOutput"
          OutputFormat:
             S3:
                Bucket: myBucket
             Compression:
                Format: gzip
        mapState:
          Type: Map
          ItemsPath: "$.preProcessOutput.data"
          InputFormat:
             S3:
                Bucket: myBucket
             Compression:
                Format: gzip
          MaxConcurrency: 100
          Iterator:
            StartAt: doMap
            States:
              doMap:
                Type: Task
                Resource:
                  Fn::GetAtt: [doMapLambda, Arn]
                End: true
          Next: ### next steps, not relevant

这样 Map 甚至可以在大负载上执行其工作。