boto3.client('stepfunctions').get_activity_task() 挂起
boto3.client('stepfunctions').get_activity_task() hangs
我有一个项目要实施,我需要与另一个部门创建的 Step Function 进行通信。我是 Step Functions 的新手,所以如果我遗漏任何内容,请多多包涵。
我们有一个 UI,用户可以在其中请求他们的数据或删除他们的数据。此请求被发送到 API 网关,然后发送到创建多个 workers/subscriptions 的 Step Function。我的任务是创建一个 Azure 函数(使用 Python)来处理任务,并 link 到我们拥有数据的相关位置,然后将其删除或 return 到 S3 存储桶。因此我有以下脚本:
import datetime
import logging
import boto3
import os
import json
workerName = creds['workerName']
region_name = creds['region_name']
activityArn = creds['activityArn']
aws_access_key_id = creds['aws_access_key_id']
aws_secret_access_key = creds['aws_secret_access_key']
bucket = creds['bucket']
sfn_client = boto3.client(
service_name='stepfunctions',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name
)
activity = sfn_client.get_activity_task(
activityArn = activityArn,
workerName = workerName
)
task_token, task = activity['taskToken'], json.loads(activity['input'])
# TODO Process Task
我注意到每次我 运行 activity = ...
我都会得到一个新任务而不是一个列表,并且已经阅读了我需要使用 send_task_failure()
、send_task_heartbeat()
和 send_task_success()
方法很好。因为它 return 每个 运行 一个 activity 我计划 运行 一个循环,直到我没有更多的活动但是当我结束时(或者当没有活动 运行) 脚本一直挂起直到超时。
有没有一种方法可以只计算未开始的活动数,以便我可以使用它来循环,或者有更好的方法吗?
好的,所以在阅读文档后我发现我必须添加一个 read_timeout
> 而不是默认值...我认为默认值是 60 秒所以我添加了一个 65 秒的超时 [=14] =]
import datetime
import logging
import boto3
from botocore.client import Config
import os
import json
connect_timeout = creds['connect_timeout'] + 5
read_timeout = creds['read_timeout'] + 5
workerName = creds['workerName']
region_name = creds['region_name']
activityArn = creds['activityArn']
aws_access_key_id = creds['aws_access_key_id']
aws_secret_access_key = creds['aws_secret_access_key']
bucket = creds['bucket']
cfg = creds['cfg']
config = Config(
connect_timeout=connect_timeout,
read_timeout=read_timeout,
retries={'max_attempts': 0}
)
sfn_client = boto3.client(
service_name='stepfunctions',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name,
config=config
)
while True:
activity_task = sfn_client.get_activity_task(
activityArn = activityArn,
workerName = workerName
)
if 'input' not in activity_task.keys() or 'taskToken' not in activity_task.keys():
print(f"No more activity tasks")
break
taskToken, task = activity_task['taskToken'], json.loads(activity_task['input'])
在最后通过它时 returns 一个 JSON 与所有其他活动具有相同的键,但没有 input
和 taskToken
我有一个项目要实施,我需要与另一个部门创建的 Step Function 进行通信。我是 Step Functions 的新手,所以如果我遗漏任何内容,请多多包涵。
我们有一个 UI,用户可以在其中请求他们的数据或删除他们的数据。此请求被发送到 API 网关,然后发送到创建多个 workers/subscriptions 的 Step Function。我的任务是创建一个 Azure 函数(使用 Python)来处理任务,并 link 到我们拥有数据的相关位置,然后将其删除或 return 到 S3 存储桶。因此我有以下脚本:
import datetime
import logging
import boto3
import os
import json
workerName = creds['workerName']
region_name = creds['region_name']
activityArn = creds['activityArn']
aws_access_key_id = creds['aws_access_key_id']
aws_secret_access_key = creds['aws_secret_access_key']
bucket = creds['bucket']
sfn_client = boto3.client(
service_name='stepfunctions',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name
)
activity = sfn_client.get_activity_task(
activityArn = activityArn,
workerName = workerName
)
task_token, task = activity['taskToken'], json.loads(activity['input'])
# TODO Process Task
我注意到每次我 运行 activity = ...
我都会得到一个新任务而不是一个列表,并且已经阅读了我需要使用 send_task_failure()
、send_task_heartbeat()
和 send_task_success()
方法很好。因为它 return 每个 运行 一个 activity 我计划 运行 一个循环,直到我没有更多的活动但是当我结束时(或者当没有活动 运行) 脚本一直挂起直到超时。
有没有一种方法可以只计算未开始的活动数,以便我可以使用它来循环,或者有更好的方法吗?
好的,所以在阅读文档后我发现我必须添加一个 read_timeout
> 而不是默认值...我认为默认值是 60 秒所以我添加了一个 65 秒的超时 [=14] =]
import datetime
import logging
import boto3
from botocore.client import Config
import os
import json
connect_timeout = creds['connect_timeout'] + 5
read_timeout = creds['read_timeout'] + 5
workerName = creds['workerName']
region_name = creds['region_name']
activityArn = creds['activityArn']
aws_access_key_id = creds['aws_access_key_id']
aws_secret_access_key = creds['aws_secret_access_key']
bucket = creds['bucket']
cfg = creds['cfg']
config = Config(
connect_timeout=connect_timeout,
read_timeout=read_timeout,
retries={'max_attempts': 0}
)
sfn_client = boto3.client(
service_name='stepfunctions',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name,
config=config
)
while True:
activity_task = sfn_client.get_activity_task(
activityArn = activityArn,
workerName = workerName
)
if 'input' not in activity_task.keys() or 'taskToken' not in activity_task.keys():
print(f"No more activity tasks")
break
taskToken, task = activity_task['taskToken'], json.loads(activity_task['input'])
在最后通过它时 returns 一个 JSON 与所有其他活动具有相同的键,但没有 input
和 taskToken