AWS 检查 StateMachines/StepFunctions 并发运行
AWS Checking StateMachines/StepFunctions concurrent runs
我在处理其中包含 GlueJob 任务的 StateMachine(Step Function)的并发运行时遇到了很多问题。
状态机由 Lambda 启动,该 Lambda 由 FIFO SQS 队列触发。
lambda 获取消息,检查状态机实例的数量 运行,如果该数量低于 GlueJob 并发运行阈值,它会启动状态机。
我遇到的问题是此检查大部分时间都失败了。尽管我的 GlueJob 没有足够的并发可用,但状态机启动了。显然,SQS 队列传递给 lambda 的消息得到处理,因此如果状态机由于这个原因而失败,则该消息将永远消失(除非我捕获异常并将新消息发送回队列)。
我认为此行为是由于我的 lambda 处理消息的速度过快(尽管它是一个 FIFO 队列,所以一次处理 1 条消息),而且我的检查器跟不上。
我在这里和那里实施了一些 time.sleep() 以查看情况是否有所好转,但没有实质性改进。
我想问一下您是否遇到过这样的问题,以及您是如何通过编程解决这些问题的。
提前致谢!
这是我的检查器:
def get_running_workflows(sf_client, sfn_arn, cnt=0, next_token=None):
if next_token:
response = sf_client.list_executions(stateMachineArn=sfn_arn, statusFilter='RUNNING', nextToken=next_token)
else:
response = sf_client.list_executions(stateMachineArn=sfn_arn, statusFilter='RUNNING')
cnt += len(response['executions'])
if cnt > 0 and 'nextToken' in response:
return get_running_workflows(cnt, response['nextToken'])
return cnt
编辑 24-01-2022:
这是基于@Mark-B 答案的实现。
我创建了一个 DynamoDB table 如下:
resources:
Resources:
concurrencyHandler:
Type: AWS::DynamoDB::Table
Properties:
TableName: concurrencyHandler
AttributeDefinitions:
- AttributeName: concurrencyProcess
AttributeType: S
KeySchema:
- AttributeName: concurrencyProcess
KeyType: HASH
BillingMode: PAY_PER_REQUEST
Primary Key 用来存放我处理并发的进程名。如果我想重用相同的 table 来处理其他进程的并发,这会有所帮助。
然后,在使用来自 SQS 的消息并触发我的 Step Function 的 lambda 中,我编写了一个带有条件检查的原子计数器,如下所示:
try:
response = concurrency_table.update_item(
Key={'concurrencyProcess': processName},
ExpressionAttributeNames={
'#C': 'concurrencyCount',
'#O': step_function_name,
},
ExpressionAttributeValues={
':start': 0,
':inc': 1,
':limit': int(glue_max_concurrent_runs),
':time': datetime.datetime.utcnow().isoformat()
},
UpdateExpression="SET #C = if_not_exists(#C, :start) + :inc, #O = :time",
ConditionExpression="#C < :limit AND attribute_not_exists(#O)",
ReturnValues="UPDATED_NEW"
)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] != 'ConditionalCheckFailedException':
raise
else:
raise Exception("Either max Concurrency number reached or "
"Stepfunction is requesting second lock")
只要有新消息到达,此代码就会递增计数器。如果计数器 >= 比限制,它将失败。所以消息会被送回队列,稍后再处理。
显然,我已经在我的 StepFunction 工作流中设置了一个清理的最后一步,如果工作流成功,如果工作流因任何原因失败的话。这是 .yaml:
UpdateConcurrencyCountSuccess:
Type: Task
Resource: arn:aws:states:::dynamodb:updateItem
Parameters:
TableName: ${self:custom.concurrencyTable}
Key:
concurrencyProcess:
S: {processName}
ExpressionAttributeNames:
'#C': concurrencyCount
'#O.$': $$.Execution.Name
ExpressionAttributeValues:
:dec:
N: '1'
UpdateExpression: "SET #C = #C - :dec REMOVE #O"
ConditionExpression: attribute_exists(#O)
ReturnValues: UPDATED_NEW
Retry:
- ErrorEquals:
- DynamoDB.ConditionalCheckFailedException
MaxAttempts: 0
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 20
BackoffRate: 1.4
ResultPath: $.update_concurrency_success
End: true
Catch:
- ErrorEquals:
- States.ALL
Next: PublishToSNSFailed
ResultPath: $.error
# Update Concurrency Count table, failed flow
UpdateConcurrencyCountFail:
Type: Task
Resource: arn:aws:states:::dynamodb:updateItem
Parameters:
TableName: ${self:custom.concurrencyTable}
Key:
concurrencyProcess:
S: {processName}
ExpressionAttributeNames:
'#C': concurrencyCount
'#O.$': $$.Execution.Name
ExpressionAttributeValues:
:dec:
N: '1'
UpdateExpression: "SET #C = #C - :dec REMOVE #O"
ConditionExpression: attribute_exists(#O)
ReturnValues: UPDATED_NEW
Retry:
- ErrorEquals:
- DynamoDB.ConditionalCheckFailedException
MaxAttempts: 0
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 20
BackoffRate: 1.4
ResultPath: $.update_concurrency_failed
Next: ExecutionFailed
Catch:
- ErrorEquals:
- States.ALL
Next: ExecutionFailed
ResultPath: $.error
注意 DynamoDB 中的属性 #O
是 Step Function Name,并将 starting-execution-time 作为值。步骤函数执行的名称在我的 trigger-lambda 中生成(它是单义的)。在我的工作流程中,我可以通过 $$Execution.Name
访问该名称,以便在清理过程中删除该属性。
将 step 函数的唯一名称作为属性可以帮助调试,以防(尽管非常遥远)执行开始但无法减少计数器。
编辑 02-02-2022:
在对该解决方案进行压力测试后,我得出的结论是它无法保持状态机的计数运行:当最大值设置为 10 时,不知何故给出了第 11 个锁。
最重要的是,我必须说,Glue 在“SUCCESS”之后有某种不可见的“状态”,这导致并发性低于可用锁(例如:0 胶合并发可用,但 2/3 空闲锁).
希望 AWS 可以实现一些并发控制和从 SQS 到 StepFunction 的直接触发器
您将 运行 使用此方法遇到问题,因为启动新流程的调用可能不会立即导致 list_executions()
显示新号码。请求启动新工作流与实际启动工作流之间可能需要几秒钟。据我所知,list_executions()
API 调用没有 强一致性 保证。
您需要一些高度一致的东西,并且 DynamoDB atomic counters is a great solution for this problem. Amazon published a blog post 详细说明 DynamoDB 在这个具体场景中的使用。要点是您将尝试在 DynamoDB 中增加一个原子计数器,使用 limit
表达式,如果它会导致计数器超过某个值,该表达式会导致增加失败。捕获 failure/exception 是您的 Lambda 函数知道将消息发送回队列的方式。然后在工作流程结束时调用另一个 Lambda 函数来递减计数器。
我在处理其中包含 GlueJob 任务的 StateMachine(Step Function)的并发运行时遇到了很多问题。
状态机由 Lambda 启动,该 Lambda 由 FIFO SQS 队列触发。
lambda 获取消息,检查状态机实例的数量 运行,如果该数量低于 GlueJob 并发运行阈值,它会启动状态机。
我遇到的问题是此检查大部分时间都失败了。尽管我的 GlueJob 没有足够的并发可用,但状态机启动了。显然,SQS 队列传递给 lambda 的消息得到处理,因此如果状态机由于这个原因而失败,则该消息将永远消失(除非我捕获异常并将新消息发送回队列)。
我认为此行为是由于我的 lambda 处理消息的速度过快(尽管它是一个 FIFO 队列,所以一次处理 1 条消息),而且我的检查器跟不上。
我在这里和那里实施了一些 time.sleep() 以查看情况是否有所好转,但没有实质性改进。
我想问一下您是否遇到过这样的问题,以及您是如何通过编程解决这些问题的。
提前致谢!
这是我的检查器:
def get_running_workflows(sf_client, sfn_arn, cnt=0, next_token=None):
if next_token:
response = sf_client.list_executions(stateMachineArn=sfn_arn, statusFilter='RUNNING', nextToken=next_token)
else:
response = sf_client.list_executions(stateMachineArn=sfn_arn, statusFilter='RUNNING')
cnt += len(response['executions'])
if cnt > 0 and 'nextToken' in response:
return get_running_workflows(cnt, response['nextToken'])
return cnt
编辑 24-01-2022:
这是基于@Mark-B 答案的实现。
我创建了一个 DynamoDB table 如下:
resources:
Resources:
concurrencyHandler:
Type: AWS::DynamoDB::Table
Properties:
TableName: concurrencyHandler
AttributeDefinitions:
- AttributeName: concurrencyProcess
AttributeType: S
KeySchema:
- AttributeName: concurrencyProcess
KeyType: HASH
BillingMode: PAY_PER_REQUEST
Primary Key 用来存放我处理并发的进程名。如果我想重用相同的 table 来处理其他进程的并发,这会有所帮助。
然后,在使用来自 SQS 的消息并触发我的 Step Function 的 lambda 中,我编写了一个带有条件检查的原子计数器,如下所示:
try:
response = concurrency_table.update_item(
Key={'concurrencyProcess': processName},
ExpressionAttributeNames={
'#C': 'concurrencyCount',
'#O': step_function_name,
},
ExpressionAttributeValues={
':start': 0,
':inc': 1,
':limit': int(glue_max_concurrent_runs),
':time': datetime.datetime.utcnow().isoformat()
},
UpdateExpression="SET #C = if_not_exists(#C, :start) + :inc, #O = :time",
ConditionExpression="#C < :limit AND attribute_not_exists(#O)",
ReturnValues="UPDATED_NEW"
)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] != 'ConditionalCheckFailedException':
raise
else:
raise Exception("Either max Concurrency number reached or "
"Stepfunction is requesting second lock")
只要有新消息到达,此代码就会递增计数器。如果计数器 >= 比限制,它将失败。所以消息会被送回队列,稍后再处理。
显然,我已经在我的 StepFunction 工作流中设置了一个清理的最后一步,如果工作流成功,如果工作流因任何原因失败的话。这是 .yaml:
UpdateConcurrencyCountSuccess:
Type: Task
Resource: arn:aws:states:::dynamodb:updateItem
Parameters:
TableName: ${self:custom.concurrencyTable}
Key:
concurrencyProcess:
S: {processName}
ExpressionAttributeNames:
'#C': concurrencyCount
'#O.$': $$.Execution.Name
ExpressionAttributeValues:
:dec:
N: '1'
UpdateExpression: "SET #C = #C - :dec REMOVE #O"
ConditionExpression: attribute_exists(#O)
ReturnValues: UPDATED_NEW
Retry:
- ErrorEquals:
- DynamoDB.ConditionalCheckFailedException
MaxAttempts: 0
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 20
BackoffRate: 1.4
ResultPath: $.update_concurrency_success
End: true
Catch:
- ErrorEquals:
- States.ALL
Next: PublishToSNSFailed
ResultPath: $.error
# Update Concurrency Count table, failed flow
UpdateConcurrencyCountFail:
Type: Task
Resource: arn:aws:states:::dynamodb:updateItem
Parameters:
TableName: ${self:custom.concurrencyTable}
Key:
concurrencyProcess:
S: {processName}
ExpressionAttributeNames:
'#C': concurrencyCount
'#O.$': $$.Execution.Name
ExpressionAttributeValues:
:dec:
N: '1'
UpdateExpression: "SET #C = #C - :dec REMOVE #O"
ConditionExpression: attribute_exists(#O)
ReturnValues: UPDATED_NEW
Retry:
- ErrorEquals:
- DynamoDB.ConditionalCheckFailedException
MaxAttempts: 0
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 20
BackoffRate: 1.4
ResultPath: $.update_concurrency_failed
Next: ExecutionFailed
Catch:
- ErrorEquals:
- States.ALL
Next: ExecutionFailed
ResultPath: $.error
注意 DynamoDB 中的属性 #O
是 Step Function Name,并将 starting-execution-time 作为值。步骤函数执行的名称在我的 trigger-lambda 中生成(它是单义的)。在我的工作流程中,我可以通过 $$Execution.Name
访问该名称,以便在清理过程中删除该属性。
将 step 函数的唯一名称作为属性可以帮助调试,以防(尽管非常遥远)执行开始但无法减少计数器。
编辑 02-02-2022:
在对该解决方案进行压力测试后,我得出的结论是它无法保持状态机的计数运行:当最大值设置为 10 时,不知何故给出了第 11 个锁。
最重要的是,我必须说,Glue 在“SUCCESS”之后有某种不可见的“状态”,这导致并发性低于可用锁(例如:0 胶合并发可用,但 2/3 空闲锁).
希望 AWS 可以实现一些并发控制和从 SQS 到 StepFunction 的直接触发器
您将 运行 使用此方法遇到问题,因为启动新流程的调用可能不会立即导致 list_executions()
显示新号码。请求启动新工作流与实际启动工作流之间可能需要几秒钟。据我所知,list_executions()
API 调用没有 强一致性 保证。
您需要一些高度一致的东西,并且 DynamoDB atomic counters is a great solution for this problem. Amazon published a blog post 详细说明 DynamoDB 在这个具体场景中的使用。要点是您将尝试在 DynamoDB 中增加一个原子计数器,使用 limit
表达式,如果它会导致计数器超过某个值,该表达式会导致增加失败。捕获 failure/exception 是您的 Lambda 函数知道将消息发送回队列的方式。然后在工作流程结束时调用另一个 Lambda 函数来递减计数器。