AWS 检查 StateMachines/StepFunctions 并发运行

AWS Checking StateMachines/StepFunctions concurrent runs

我在处理其中包含 GlueJob 任务的 StateMachine(Step Function)的并发运行时遇到了很多问题。

状态机由 Lambda 启动,该 Lambda 由 FIFO SQS 队列触发。

lambda 获取消息,检查状态机实例的数量 运行,如果该数量低于 GlueJob 并发运行阈值,它会启动状态机。

我遇到的问题是此检查大部分时间都失败了。尽管我的 GlueJob 没有足够的并发可用,但状态机启动了。显然,SQS 队列传递给 lambda 的消息得到处理,因此如果状态机由于这个原因而失败,则该消息将永远消失(除非我捕获异常并将新消息发送回队列)。

我认为此行为是由于我的 lambda 处理消息的速度过快(尽管它是一个 FIFO 队列,所以一次处理 1 条消息),而且我的检查器跟不上。

我在这里和那里实施了一些 time.sleep() 以查看情况是否有所好转,但没有实质性改进。

我想问一下您是否遇到过这样的问题,以及您是如何通过编程解决这些问题的。

提前致谢!

这是我的检查器:

def get_running_workflows(sf_client, sfn_arn, cnt=0, next_token=None):

    if next_token:
        response = sf_client.list_executions(stateMachineArn=sfn_arn, statusFilter='RUNNING', nextToken=next_token)
    else:
        response = sf_client.list_executions(stateMachineArn=sfn_arn, statusFilter='RUNNING')

    cnt += len(response['executions'])

    if cnt > 0 and 'nextToken' in response:
        return get_running_workflows(cnt, response['nextToken'])

    return cnt

编辑 24-01-2022:

这是基于@Mark-B 答案的实现。

我创建了一个 DynamoDB table 如下:

resources:
  Resources:
    concurrencyHandler:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: concurrencyHandler
        AttributeDefinitions:
          - AttributeName: concurrencyProcess
            AttributeType: S
        KeySchema:
          - AttributeName: concurrencyProcess
            KeyType: HASH
        BillingMode: PAY_PER_REQUEST

Primary Key 用来存放我处理并发的进程名。如果我想重用相同的 table 来处理其他进程的并发,这会有所帮助。

然后,在使用来自 SQS 的消息并触发我的 Step Function 的 lambda 中,我编写了一个带有条件检查的原子计数器,如下所示:

    try:
        response = concurrency_table.update_item(
            Key={'concurrencyProcess': processName},
            ExpressionAttributeNames={
                '#C': 'concurrencyCount',
                '#O': step_function_name,
            },
            ExpressionAttributeValues={
                ':start': 0,
                ':inc': 1,
                ':limit': int(glue_max_concurrent_runs),
                ':time': datetime.datetime.utcnow().isoformat()
            },
            UpdateExpression="SET #C = if_not_exists(#C, :start) + :inc, #O = :time",
            ConditionExpression="#C < :limit AND attribute_not_exists(#O)",
            ReturnValues="UPDATED_NEW"
        )

    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] != 'ConditionalCheckFailedException':
            raise
        else:
            raise Exception("Either max Concurrency number reached or "
                            "Stepfunction is requesting second lock")

只要有新消息到达,此代码就会递增计数器。如果计数器 >= 比限制,它将失败。所以消息会被送回队列,稍后再处理。

显然,我已经在我的 StepFunction 工作流中设置了一个清理的最后一步,如果工作流成功,如果工作流因任何原因失败的话。这是 .yaml:

  UpdateConcurrencyCountSuccess:
    Type: Task
    Resource: arn:aws:states:::dynamodb:updateItem
    Parameters:
      TableName: ${self:custom.concurrencyTable}
      Key:
        concurrencyProcess:
          S: {processName}
      ExpressionAttributeNames:
        '#C': concurrencyCount
        '#O.$': $$.Execution.Name
      ExpressionAttributeValues:
        :dec:
          N: '1'
      UpdateExpression: "SET #C = #C - :dec REMOVE #O"
      ConditionExpression: attribute_exists(#O)
      ReturnValues: UPDATED_NEW
    Retry:
      - ErrorEquals:
          - DynamoDB.ConditionalCheckFailedException
        MaxAttempts: 0
      - ErrorEquals:
          - States.ALL
        IntervalSeconds: 5
        MaxAttempts: 20
        BackoffRate: 1.4
    ResultPath: $.update_concurrency_success 
    End: true
    Catch:
      - ErrorEquals:
          - States.ALL
        Next: PublishToSNSFailed
        ResultPath: $.error

  # Update Concurrency Count table, failed flow
  UpdateConcurrencyCountFail:
    Type: Task
    Resource: arn:aws:states:::dynamodb:updateItem
    Parameters:
      TableName: ${self:custom.concurrencyTable}
      Key:
        concurrencyProcess:
          S: {processName}
      ExpressionAttributeNames:
        '#C': concurrencyCount
        '#O.$': $$.Execution.Name
      ExpressionAttributeValues:
        :dec:
          N: '1'
      UpdateExpression: "SET #C = #C - :dec REMOVE #O"
      ConditionExpression: attribute_exists(#O)
      ReturnValues: UPDATED_NEW
    Retry:
      - ErrorEquals:
          - DynamoDB.ConditionalCheckFailedException
        MaxAttempts: 0
      - ErrorEquals:
          - States.ALL
        IntervalSeconds: 5
        MaxAttempts: 20
        BackoffRate: 1.4
    ResultPath: $.update_concurrency_failed     
    Next: ExecutionFailed
    Catch:
      - ErrorEquals:
          - States.ALL
        Next: ExecutionFailed
        ResultPath: $.error

注意 DynamoDB 中的属性 #O 是 Step Function Name,并将 starting-execution-time 作为值。步骤函数执行的名称在我的 trigger-lambda 中生成(它是单义的)。在我的工作流程中,我可以通过 $$Execution.Name 访问该名称,以便在清理过程中删除该属性。

将 step 函数的唯一名称作为属性可以帮助调试,以防(尽管非常遥远)执行开始但无法减少计数器。

编辑 02-02-2022:

在对该解决方案进行压力测试后,我得出的结论是它无法保持状态机的计数运行:当最大值设置为 10 时,不知何故给出了第 11 个锁。

最重要的是,我必须说,Glue 在“SUCCESS”之后有某种不可见的“状态”,这导致并发性低于可用锁(例如:0 胶合并发可用,但 2/3 空闲锁).

希望 AWS 可以实现一些并发控制和从 SQS 到 StepFunction 的直接触发器

您将 运行 使用此方法遇到问题,因为启动新流程的调用可能不会立即导致 list_executions() 显示新号码。请求启动新工作流与实际启动工作流之间可能需要几秒钟。据我所知,list_executions() API 调用没有 强一致性 保证。

您需要一些高度一致的东西,并且 DynamoDB atomic counters is a great solution for this problem. Amazon published a blog post 详细说明 DynamoDB 在这个具体场景中的使用。要点是您将尝试在 DynamoDB 中增加一个原子计数器,使用 limit 表达式,如果它会导致计数器超过某个值,该表达式会导致增加失败。捕获 failure/exception 是您的 Lambda 函数知道将消息发送回队列的方式。然后在工作流程结束时调用另一个 Lambda 函数来递减计数器。