(Step Functions Activity Worker)在 boto 中处理长轮询超时的最佳实践?

(Step Functions Activity Worker) Best practice for handling long polling timeouts in boto?

我正在开发我的第一个 Step Functions Activity Worker (EC2)。可以预见的是,在 Step Functions 状态机没有 activity 的情况下进行 5 分钟长轮询后,客户端连接超时并出现错误:

botocore.exceptions.ReadTimeoutError:端点读取超时 URL:“https://states.us-east-1.amazonaws.com/

捕获错误并重试长轮询(当没有 activity 时每 5 分钟一次),还是尝试提前终止调用并在错误发生前重试更好?我考虑过使用不同类型的循环,但我想最大化长轮询的价值,而不是重复请求 Step Functions API(尽管如果这是最好的方法,我会这样做)。

谢谢,

安德鲁

import boto3
import time
import json

region = 'us-east-1'
activity_arn = 'arn:aws:states:us-east-1:754112345676:activity:Process_Imagery' 

while True:
    client = boto3.client('stepfunctions', region_name=region)
    response = client.get_activity_task(activityArn=activity_arn,
                                    workerName='imagery_processor')    
    activity_token = response['taskToken']
    input_params = json.loads(response['input'])

    print("================")
    print(input_params)
    client.send_task_success(taskToken=activity_token, output='true')

我相信我在这里回答了我自己的问题。 AWS documentation states: "The maximum time the service holds on to the request before responding is 60 seconds. If no task is available within 60 seconds, the poll returns a taskToken with a null string."

但是,我相信 StepFunctions 的 JSON 响应根本没有 'taskToken',而不是返回字符串。这个 while 循环有效:

import boto3
import time
import json
from botocore.config import Config as BotoCoreConfig

region = 'us-east-1'
boto_config = BotoCoreConfig(read_timeout=70, region_name=region)
sf_client = boto3.client('stepfunctions', config=boto_config)
activity_arn = 'arn:aws:states:us-east-1:754185699999:activity:Process_Imagery'  

while True:
    response = sf_client.get_activity_task(activityArn=activity_arn,
                                    workerName='imagery_processor')    

    if 'taskToken' not in response:
        print('No Task Token')
        # time.sleep(2)
    else:
        print(response['taskToken']) 
        print("===================")
        activity_token = response['taskToken']
        sf_client.send_task_success(taskToken=activity_token, output='true')