boto3.client('stepfunctions').get_activity_task() 挂起

boto3.client('stepfunctions').get_activity_task() hangs

我有一个项目要实施,我需要与另一个部门创建的 Step Function 进行通信。我是 Step Functions 的新手,所以如果我遗漏任何内容,请多多包涵。

我们有一个 UI,用户可以在其中请求他们的数据或删除他们的数据。此请求被发送到 API 网关,然后发送到创建多个 workers/subscriptions 的 Step Function。我的任务是创建一个 Azure 函数(使用 Python)来处理任务,并 link 到我们拥有数据的相关位置,然后将其删除或 return 到 S3 存储桶。因此我有以下脚本:

import datetime
import logging
import boto3
import os
import json

workerName = creds['workerName']
region_name = creds['region_name']
activityArn = creds['activityArn']
aws_access_key_id = creds['aws_access_key_id']
aws_secret_access_key = creds['aws_secret_access_key']
bucket = creds['bucket']

sfn_client = boto3.client(
    service_name='stepfunctions',
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name=region_name
)

activity = sfn_client.get_activity_task(
    activityArn = activityArn,
    workerName = workerName
)

task_token, task = activity['taskToken'], json.loads(activity['input'])
# TODO Process Task

我注意到每次我 运行 activity = ... 我都会得到一个新任务而不是一个列表,并且已经阅读了我需要使用 send_task_failure()send_task_heartbeat()send_task_success() 方法很好。因为它 return 每个 运行 一个 activity 我计划 运行 一个循环,直到我没有更多的活动但是当我结束时(或者当没有活动 运行) 脚本一直挂起直到超时。

有没有一种方法可以只计算未开始的活动数,以便我可以使用它来循环,或者有更好的方法吗?

好的,所以在阅读文档后我发现我必须添加一个 read_timeout > 而不是默认值...我认为默认值是 60 秒所以我添加了一个 65 秒的超时 [=14] =]

import datetime
import logging
import boto3
from botocore.client import Config
import os
import json

connect_timeout = creds['connect_timeout'] + 5
read_timeout = creds['read_timeout'] + 5
workerName = creds['workerName']
region_name = creds['region_name']
activityArn = creds['activityArn']
aws_access_key_id = creds['aws_access_key_id']
aws_secret_access_key = creds['aws_secret_access_key']
bucket = creds['bucket']
cfg = creds['cfg']

config = Config(
    connect_timeout=connect_timeout,
    read_timeout=read_timeout,
    retries={'max_attempts': 0}
)

sfn_client = boto3.client(
    service_name='stepfunctions',
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name=region_name,
    config=config
)

while True:
    activity_task = sfn_client.get_activity_task(
        activityArn = activityArn,
        workerName = workerName
    )
    
    if 'input' not in activity_task.keys() or 'taskToken' not in activity_task.keys():
        print(f"No more activity tasks")
        break

    taskToken, task = activity_task['taskToken'], json.loads(activity_task['input'])

在最后通过它时 returns 一个 JSON 与所有其他活动具有相同的键,但没有 inputtaskToken