AWS step-function mapState 遍历大负载
AWS step-function mapState iterate over large payloads
我有一个状态机,由第一个预处理任务组成,该任务生成一个数组作为输出,后续映射状态使用该数组进行循环。第一个任务的输出数组变得太大,状态机抛出错误 States.DataLimitExceeded
: The state/task 'arn:aws:lambda:XYZ' returned a result with a size exceeding the maximum number of characters service limit.
这是状态机 yaml 的示例:
stateMachines:
myStateMachine:
name: "myStateMachine"
definition:
StartAt: preProcess
States:
preProcess:
Type: Task
Resource:
Fn::GetAtt: [preProcessLambda, Arn]
Next: mapState
ResultPath: "$.preProcessOutput"
mapState:
Type: Map
ItemsPath: "$.preProcessOutput.data"
MaxConcurrency: 100
Iterator:
StartAt: doMap
States:
doMap:
Type: Task
Resource:
Fn::GetAtt: [doMapLambda, Arn]
End: true
Next: ### next steps, not relevant
我想到的一个可能的解决方案是状态 preProcess
将其输出保存在 S3 存储桶中,状态 mapState
直接从中读取。这可能吗?目前 preProcess
的输出是
ResultPath: "$.preProcessOutput"
和mapState
取数组
ItemsPath: "$.preProcessOutput.data"
作为输入。
我需要如何调整地图状态直接从 S3 读取的 yaml?
我认为目前无法直接从S3 读取。您可以尝试做一些事情来绕过此限制。一种是制作您自己的迭代器而不是使用 Map State。另一个如下:
让 lambda 读取您的 s3 文件并按索引或某些 id/key 将其分块。这一步背后的想法是通过 Map State 中的迭代器传递一个更小的有效载荷。假设您的数据具有以下结构。
[ { idx: 1, ...more keys }, {idx: 2, ...more keys }, { idx: 3, ...more keys }, ... 4,997 more objects of data ]
假设您希望迭代器一次处理 1,000 行。 Return 以下元组表示您的 lambda 中的索引:[ [ 0, 999 ], [ 1000, 1999 ], [ 2000, 2999 ], [ 3000, 3999 ], [ 4000, 4999] ]
您的地图状态将接收此新数据结构,并且每次迭代都将是元组之一。迭代 #1:[ 0, 999 ],迭代 #2:[ 1000, 1999 ],等等
在您的迭代器中,调用一个使用元组索引查询您的 S3 文件的 lambda。 AWS 在 S3 存储桶上有一种名为 Amazon S3 Select
的查询语言:https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-glacier-select-sql-reference-select.html
这是另一个关于如何使用 S3 select 并使用节点将数据置于可读状态的重要资源:https://thetrevorharmon.com/blog/how-to-use-s3-select-to-query-json-in-node-js
因此,对于迭代 #1,我们查询数据结构中的前 1,000 个对象。我现在可以调用我通常在迭代器中拥有的任何函数。
这种方法的关键是 inputPath 永远不会接收大型数据结构。
我目前也在解决类似的问题。由于阶跃函数存储其整个状态,因此随着 json 映射到所有值而增长,您很快就会遇到问题。
解决这个问题的唯一真正方法是使用阶梯函数的层次结构。也就是说,您的步骤功能上的步骤功能。所以你有:
parent -> [batch1, batch2, batch...N]
然后每个batch有多个single jobs:
batch1 -> [j1,j2,j3...jBATCHSIZE]
我有一个非常简单的步进函数,我发现 ~4k
大约是在我开始达到状态限制之前我可以拥有的最大批量大小。
不是一个很好的解决方案,嘿,它有效。
截至 2020 年 9 月,步函数的限制已增加 8 倍
也许现在它符合您的要求
写这篇文章以防其他人遇到这个问题 - 我最近也不得不在工作中解决这个问题。我找到了我认为是一个相对简单的解决方案,没有使用第二步功能。
我正在为此使用 Python,并将在 Python 中提供一些示例,但该解决方案应该适用于任何语言。
假设预处理输出如下所示:
[
{Output_1},
{Output_2},
.
.
.
{Output_n}
]
而Step Function部分的简化版定义如下:
"PreProcess": {
"Type": "Task",
"Resource": "Your Resource ARN",
"Next": "Map State"
},
"Map State": {
Do a bunch of stuff
}
处理 PreProcess 输出超过 Step Functions 负载的情况:
在 PreProcess 内部,将输出批处理成小到不超过有效载荷的块。
这是最复杂的一步。您将需要进行一些试验以找到单个批次的最大尺寸。一旦你有了数字(让这个数字动态化可能很聪明),我使用 numpy 将原始的 PreProcess 输出拆分为批次数。
import numpy as np
batches = np.array_split(original_pre_process_output, number_of_batches)
再次在 PreProcess 中,将每个批次上传到 Amazon S3,将密钥保存在新列表中。此 S3 密钥列表将成为新的 PreProcess 输出。
在 Python 中,看起来像这样:
import json
import boto3
s3 = boto3.resource('s3')
batch_keys = []
for batch in batches:
s3_batch_key = 'Your S3 Key here'
s3.Bucket(YOUR_BUCKET).put_object(Key=s3_batch_key, Body=json.dumps(batch))
batch_keys.append({'batch_key': s3_batch_key})
在我实施的解决方案中,我使用 for batch_id, batch in enumerate(batches)
轻松地为每个 S3 密钥指定了自己的 ID。
将 'Inner' 地图状态包装在 'Outer' 地图状态中,并在外部地图中创建 Lambda 函数以将批次提供给内部地图。
现在我们有一个由 S3 密钥组成的小输出,我们需要一种方法来一次打开一个,将每批输入原始(现在 'Inner')映射状态。
为此,首先创建一个新的 Lambda 函数 - 这将代表 BatchJobs
状态。接下来,将初始 Map 状态包装在 Outer map 中,如下所示:
"PreProcess": {
"Type": "Task",
"Resource": "Your Resource ARN",
"Next": "Outer Map"
},
"Outer Map": {
"Type": "Map",
"MaxConcurrency": 1,
"Next": "Original 'Next' used in the Inner map",
"Iterator": {
"StartAt": "BatchJobs",
"States": {
"BatchJobs": {
"Type": "Task",
"Resource": "Newly created Lambda Function ARN",
"Next": "Inner Map"
},
"Inner Map": {
Initial Map State, left as is.
}
}
}
}
请注意外部映射中的 'MaxConcurrency' 参数 - 这只是确保按顺序执行批处理。
使用这个新的 Step Function 定义,BatchJobs
状态将收到每个批次的 {'batch_key': s3_batch_key}
。然后 BatchJobs
状态只需要获取存储在键中的对象,并将其传递给 Inner Map。
在 Python 中,BatchJobs
Lambda 函数如下所示:
import json
import boto3
s3 = boto3.client('s3')
def batch_jobs_handler(event, context):
return json.loads(s3.get_object(Bucket='YOUR_BUCKET_HERE',
Key=event.get('batch_key'))['Body'].read().decode('utf-8'))
更新您的工作流程以处理输出的新结构。
在实施此解决方案之前,您的 Map 状态输出一个输出数组:
[
{Map_output_1},
{Map_output_2},
.
.
.
{Map_output_n}
]
使用此解决方案,您现在将获得一个列表列表,每个内部列表包含每个批次的结果:
[
[
{Batch_1_output_1},
{Batch_1_output_2},
.
.
.
{Batch_1_output_n}
],
[
{Batch_2_output_1},
{Batch_2_output_2},
.
.
.
{Batch_2_output_n}
],
.
.
.
[
{Batch_n_output_1},
{Batch_n_output_2},
.
.
.
{Batch_n_output_n}
]
]
根据您的需要,您可能需要在 Map 之后调整一些代码以处理新的输出格式。
就是这样!只要您正确设置了最大批处理大小,达到有效负载限制的唯一方法就是您的 S3 密钥列表超过有效负载限制。
建议的解决方法适用于特定场景,但处理正常有效负载会生成大量可能超过有效负载限制的项目列表。
在一般情况下,我认为问题可以在场景 1->N 中重复出现。我的意思是当一个步骤可能会在工作流程中生成许多步骤执行时。
打破某些任务复杂性的一种明确方法是将其分成许多其他任务,因此很可能需要很多次。同样从可伸缩性的角度来看,这样做有一个明显的优势,因为将大计算分解成小计算的次数越多,粒度就越大,可以进行更多的并行处理和优化。
这就是 AWS increasing the max payload size 的目的。他们称之为 动态并行。
问题是 Map 状态是那个的 corner-stone。除了服务集成(数据库查询等)之外,它是唯一可以从一个步骤中动态派生出许多任务的集成。但是似乎没有办法指定有效负载在文件上。
我看到一个快速解决问题的方法是,如果他们在每个步骤中添加一个可选的持久性规范,例如:
stateMachines:
myStateMachine:
name: "myStateMachine"
definition:
StartAt: preProcess
States:
preProcess:
Type: Task
Resource:
Fn::GetAtt: [preProcessLambda, Arn]
Next: mapState
ResultPath: "$.preProcessOutput"
OutputFormat:
S3:
Bucket: myBucket
Compression:
Format: gzip
mapState:
Type: Map
ItemsPath: "$.preProcessOutput.data"
InputFormat:
S3:
Bucket: myBucket
Compression:
Format: gzip
MaxConcurrency: 100
Iterator:
StartAt: doMap
States:
doMap:
Type: Task
Resource:
Fn::GetAtt: [doMapLambda, Arn]
End: true
Next: ### next steps, not relevant
这样 Map 甚至可以在大负载上执行其工作。
我有一个状态机,由第一个预处理任务组成,该任务生成一个数组作为输出,后续映射状态使用该数组进行循环。第一个任务的输出数组变得太大,状态机抛出错误 States.DataLimitExceeded
: The state/task 'arn:aws:lambda:XYZ' returned a result with a size exceeding the maximum number of characters service limit.
这是状态机 yaml 的示例:
stateMachines:
myStateMachine:
name: "myStateMachine"
definition:
StartAt: preProcess
States:
preProcess:
Type: Task
Resource:
Fn::GetAtt: [preProcessLambda, Arn]
Next: mapState
ResultPath: "$.preProcessOutput"
mapState:
Type: Map
ItemsPath: "$.preProcessOutput.data"
MaxConcurrency: 100
Iterator:
StartAt: doMap
States:
doMap:
Type: Task
Resource:
Fn::GetAtt: [doMapLambda, Arn]
End: true
Next: ### next steps, not relevant
我想到的一个可能的解决方案是状态 preProcess
将其输出保存在 S3 存储桶中,状态 mapState
直接从中读取。这可能吗?目前 preProcess
的输出是
ResultPath: "$.preProcessOutput"
和mapState
取数组
ItemsPath: "$.preProcessOutput.data"
作为输入。
我需要如何调整地图状态直接从 S3 读取的 yaml?
我认为目前无法直接从S3 读取。您可以尝试做一些事情来绕过此限制。一种是制作您自己的迭代器而不是使用 Map State。另一个如下:
让 lambda 读取您的 s3 文件并按索引或某些 id/key 将其分块。这一步背后的想法是通过 Map State 中的迭代器传递一个更小的有效载荷。假设您的数据具有以下结构。
[ { idx: 1, ...more keys }, {idx: 2, ...more keys }, { idx: 3, ...more keys }, ... 4,997 more objects of data ]
假设您希望迭代器一次处理 1,000 行。 Return 以下元组表示您的 lambda 中的索引:[ [ 0, 999 ], [ 1000, 1999 ], [ 2000, 2999 ], [ 3000, 3999 ], [ 4000, 4999] ]
您的地图状态将接收此新数据结构,并且每次迭代都将是元组之一。迭代 #1:[ 0, 999 ],迭代 #2:[ 1000, 1999 ],等等
在您的迭代器中,调用一个使用元组索引查询您的 S3 文件的 lambda。 AWS 在 S3 存储桶上有一种名为 Amazon S3 Select
的查询语言:https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-glacier-select-sql-reference-select.html
这是另一个关于如何使用 S3 select 并使用节点将数据置于可读状态的重要资源:https://thetrevorharmon.com/blog/how-to-use-s3-select-to-query-json-in-node-js
因此,对于迭代 #1,我们查询数据结构中的前 1,000 个对象。我现在可以调用我通常在迭代器中拥有的任何函数。
这种方法的关键是 inputPath 永远不会接收大型数据结构。
我目前也在解决类似的问题。由于阶跃函数存储其整个状态,因此随着 json 映射到所有值而增长,您很快就会遇到问题。
解决这个问题的唯一真正方法是使用阶梯函数的层次结构。也就是说,您的步骤功能上的步骤功能。所以你有:
parent -> [batch1, batch2, batch...N]
然后每个batch有多个single jobs:
batch1 -> [j1,j2,j3...jBATCHSIZE]
我有一个非常简单的步进函数,我发现 ~4k
大约是在我开始达到状态限制之前我可以拥有的最大批量大小。
不是一个很好的解决方案,嘿,它有效。
截至 2020 年 9 月,步函数的限制已增加 8 倍
也许现在它符合您的要求
写这篇文章以防其他人遇到这个问题 - 我最近也不得不在工作中解决这个问题。我找到了我认为是一个相对简单的解决方案,没有使用第二步功能。
我正在为此使用 Python,并将在 Python 中提供一些示例,但该解决方案应该适用于任何语言。
假设预处理输出如下所示:
[
{Output_1},
{Output_2},
.
.
.
{Output_n}
]
而Step Function部分的简化版定义如下:
"PreProcess": {
"Type": "Task",
"Resource": "Your Resource ARN",
"Next": "Map State"
},
"Map State": {
Do a bunch of stuff
}
处理 PreProcess 输出超过 Step Functions 负载的情况:
在 PreProcess 内部,将输出批处理成小到不超过有效载荷的块。
这是最复杂的一步。您将需要进行一些试验以找到单个批次的最大尺寸。一旦你有了数字(让这个数字动态化可能很聪明),我使用 numpy 将原始的 PreProcess 输出拆分为批次数。
import numpy as np batches = np.array_split(original_pre_process_output, number_of_batches)
再次在 PreProcess 中,将每个批次上传到 Amazon S3,将密钥保存在新列表中。此 S3 密钥列表将成为新的 PreProcess 输出。
在 Python 中,看起来像这样:
import json import boto3 s3 = boto3.resource('s3') batch_keys = [] for batch in batches: s3_batch_key = 'Your S3 Key here' s3.Bucket(YOUR_BUCKET).put_object(Key=s3_batch_key, Body=json.dumps(batch)) batch_keys.append({'batch_key': s3_batch_key})
在我实施的解决方案中,我使用
for batch_id, batch in enumerate(batches)
轻松地为每个 S3 密钥指定了自己的 ID。将 'Inner' 地图状态包装在 'Outer' 地图状态中,并在外部地图中创建 Lambda 函数以将批次提供给内部地图。
现在我们有一个由 S3 密钥组成的小输出,我们需要一种方法来一次打开一个,将每批输入原始(现在 'Inner')映射状态。
为此,首先创建一个新的 Lambda 函数 - 这将代表
BatchJobs
状态。接下来,将初始 Map 状态包装在 Outer map 中,如下所示:"PreProcess": { "Type": "Task", "Resource": "Your Resource ARN", "Next": "Outer Map" }, "Outer Map": { "Type": "Map", "MaxConcurrency": 1, "Next": "Original 'Next' used in the Inner map", "Iterator": { "StartAt": "BatchJobs", "States": { "BatchJobs": { "Type": "Task", "Resource": "Newly created Lambda Function ARN", "Next": "Inner Map" }, "Inner Map": { Initial Map State, left as is. } } } }
请注意外部映射中的 'MaxConcurrency' 参数 - 这只是确保按顺序执行批处理。
使用这个新的 Step Function 定义,
BatchJobs
状态将收到每个批次的{'batch_key': s3_batch_key}
。然后BatchJobs
状态只需要获取存储在键中的对象,并将其传递给 Inner Map。在 Python 中,
BatchJobs
Lambda 函数如下所示:import json import boto3 s3 = boto3.client('s3') def batch_jobs_handler(event, context): return json.loads(s3.get_object(Bucket='YOUR_BUCKET_HERE', Key=event.get('batch_key'))['Body'].read().decode('utf-8'))
更新您的工作流程以处理输出的新结构。
在实施此解决方案之前,您的 Map 状态输出一个输出数组:
[ {Map_output_1}, {Map_output_2}, . . . {Map_output_n} ]
使用此解决方案,您现在将获得一个列表列表,每个内部列表包含每个批次的结果:
[ [ {Batch_1_output_1}, {Batch_1_output_2}, . . . {Batch_1_output_n} ], [ {Batch_2_output_1}, {Batch_2_output_2}, . . . {Batch_2_output_n} ], . . . [ {Batch_n_output_1}, {Batch_n_output_2}, . . . {Batch_n_output_n} ] ]
根据您的需要,您可能需要在 Map 之后调整一些代码以处理新的输出格式。
就是这样!只要您正确设置了最大批处理大小,达到有效负载限制的唯一方法就是您的 S3 密钥列表超过有效负载限制。
建议的解决方法适用于特定场景,但处理正常有效负载会生成大量可能超过有效负载限制的项目列表。
在一般情况下,我认为问题可以在场景 1->N 中重复出现。我的意思是当一个步骤可能会在工作流程中生成许多步骤执行时。
打破某些任务复杂性的一种明确方法是将其分成许多其他任务,因此很可能需要很多次。同样从可伸缩性的角度来看,这样做有一个明显的优势,因为将大计算分解成小计算的次数越多,粒度就越大,可以进行更多的并行处理和优化。
这就是 AWS increasing the max payload size 的目的。他们称之为 动态并行。
问题是 Map 状态是那个的 corner-stone。除了服务集成(数据库查询等)之外,它是唯一可以从一个步骤中动态派生出许多任务的集成。但是似乎没有办法指定有效负载在文件上。
我看到一个快速解决问题的方法是,如果他们在每个步骤中添加一个可选的持久性规范,例如:
stateMachines:
myStateMachine:
name: "myStateMachine"
definition:
StartAt: preProcess
States:
preProcess:
Type: Task
Resource:
Fn::GetAtt: [preProcessLambda, Arn]
Next: mapState
ResultPath: "$.preProcessOutput"
OutputFormat:
S3:
Bucket: myBucket
Compression:
Format: gzip
mapState:
Type: Map
ItemsPath: "$.preProcessOutput.data"
InputFormat:
S3:
Bucket: myBucket
Compression:
Format: gzip
MaxConcurrency: 100
Iterator:
StartAt: doMap
States:
doMap:
Type: Task
Resource:
Fn::GetAtt: [doMapLambda, Arn]
End: true
Next: ### next steps, not relevant
这样 Map 甚至可以在大负载上执行其工作。