使用 boto3 时从 AWS SWF 接收重复的决策任务
Receiving duplicate decision tasks from AWS SWF when using boto3
我创建了一个简单的 SWF 工作流程,但似乎收到了多个可用的新决策任务通知。我正在使用 boto3 python sdk.
我找不到好的 boto3 swf 示例代码,所以我从 http://boto.cloudhackers.com/en/latest/swf_tut.html 的 boto2 示例开始。
我用这个脚本创建了我的简单工作流,它在该工作流中创建了一个域、一个工作流和一个任务:
#!/usr/bin/python
import boto3
from botocore.exceptions import ClientError
swf = boto3.client('swf')
try:
swf.register_domain(
name="surroundiotest-swf",
description="Surroundio test SWF domain",
workflowExecutionRetentionPeriodInDays="10"
)
except ClientError as e:
print "Domain already exists: ", e.response.get("Error", {}).get("Code")
try:
swf.register_workflow_type(
domain="surroundiotest-swf",
name="testflow",
version="0.1",
description="testworkflow",
defaultExecutionStartToCloseTimeout="250",
defaultTaskStartToCloseTimeout="NONE",
defaultChildPolicy="TERMINATE",
defaultTaskList={"name": "testflow"}
)
print "testflow created!"
except ClientError as e:
print "Workflow already exists: ", e.response.get("Error", {}).get("Code")
try:
swf.register_activity_type(
domain="surroundiotest-swf",
name="testworker",
version="0.1",
description="testworker",
defaultTaskStartToCloseTimeout="NONE",
defaultTaskList={"name": "testflow"}
)
print "testworker created!"
except ClientError as e:
print "Activity already exists: ", e.response.get("Error", {}).get("Code")
我的工人代码:
#!/usr/bin/python
import boto3
from botocore.client import Config
botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)
print "Listening for Worker Tasks"
while True:
task = swf.poll_for_activity_task(
domain='surroundiotest-swf',
taskList={'name': 'testflow'},
identity='worker-1')
if 'taskToken' not in task:
print "Poll timed out, no new task. Repoll"
else:
print "New task arrived"
swf.respond_activity_task_completed(
taskToken=task['taskToken'],
result='success'
)
print "Task Done"
我的决策代码:
#!/usr/bin/python
import boto3
from botocore.client import Config
botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)
print "Listening for Decision Tasks"
while True:
newTask = swf.poll_for_decision_task(
domain='surroundiotest-swf',
taskList={'name': 'testflow'},
identity='decider-1',
reverseOrder=True)
if 'taskToken' not in newTask:
print "Poll timed out, no new task. Repoll"
elif 'events' in newTask:
eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')]
lastEvent = eventHistory[-1]
if lastEvent['eventType'] == 'WorkflowExecutionStarted':
print "Dispatching task to worker", newTask['workflowExecution'], newTask['workflowType']
swf.respond_decision_task_completed(
taskToken=newTask['taskToken'],
decisions=[
{
'decisionType': 'ScheduleActivityTask',
'scheduleActivityTaskDecisionAttributes': {
'activityType':{
'name': 'testworker',
'version': '0.1'
},
'activityId': 'activityid-1001',
'input': '',
'scheduleToCloseTimeout': 'NONE',
'scheduleToStartTimeout': 'NONE',
'startToCloseTimeout': 'NONE',
'heartbeatTimeout': 'NONE',
'taskList': {'name': 'testflow'},
}
}
]
)
print "Task Dispatched"
# print json.dumps(newTask, default=json_serial, sort_keys=True, indent=4, separators=(',', ': '))
elif lastEvent['eventType'] == 'ActivityTaskCompleted':
swf.respond_decision_task_completed(
taskToken=newTask['taskToken'],
decisions=[
{
'decisionType': 'CompleteWorkflowExecution',
'completeWorkflowExecutionDecisionAttributes': {
'result': 'success'
}
}
]
)
print "Task Completed!"
当我运行这些脚本,并提交请求时,decider接收任务并派发,worker接收任务并执行。但是,决策者会在每次后续轮询时收到任务通知,因此我从决策者那里看到这样的输出:
Listening for Decision Tasks
Poll timed out, no new task. Repoll
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'}
Task Dispatched
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'}
Task Dispatched
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'}
Task Dispatched
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'}
Task Dispatched
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'}
Task Dispatched
SWF 似乎没有收到决策者已分派任务的通知,而是不断将任务交付给决策者。我是否在某些决策程序设置上做错了,是否需要向 SWF 传达一些其他信息?
你有一个小错误。您正在使用 reverseOrder=True
轮询决策任务。来自 PollForDecisionTask 的 API 文档:
reverseOrder
When set to true
, returns the events in reverse order. By default the results are returned in ascending order of the eventTimestamp
of the events.
在 reverseOrder=True
中,您最后获得最旧的事件,它始终是 WorkflowExecutionStarted
。任务完成后,你总是重新安排。
通常,您希望使用 previousStartedEventId
并仅处理在上一个决策任务之后发生的新事件。仅处理最后一个 activity 任务并不总是足够的,因为可能有多个事件要处理。
我创建了一个简单的 SWF 工作流程,但似乎收到了多个可用的新决策任务通知。我正在使用 boto3 python sdk.
我找不到好的 boto3 swf 示例代码,所以我从 http://boto.cloudhackers.com/en/latest/swf_tut.html 的 boto2 示例开始。
我用这个脚本创建了我的简单工作流,它在该工作流中创建了一个域、一个工作流和一个任务:
#!/usr/bin/python
import boto3
from botocore.exceptions import ClientError
swf = boto3.client('swf')
try:
swf.register_domain(
name="surroundiotest-swf",
description="Surroundio test SWF domain",
workflowExecutionRetentionPeriodInDays="10"
)
except ClientError as e:
print "Domain already exists: ", e.response.get("Error", {}).get("Code")
try:
swf.register_workflow_type(
domain="surroundiotest-swf",
name="testflow",
version="0.1",
description="testworkflow",
defaultExecutionStartToCloseTimeout="250",
defaultTaskStartToCloseTimeout="NONE",
defaultChildPolicy="TERMINATE",
defaultTaskList={"name": "testflow"}
)
print "testflow created!"
except ClientError as e:
print "Workflow already exists: ", e.response.get("Error", {}).get("Code")
try:
swf.register_activity_type(
domain="surroundiotest-swf",
name="testworker",
version="0.1",
description="testworker",
defaultTaskStartToCloseTimeout="NONE",
defaultTaskList={"name": "testflow"}
)
print "testworker created!"
except ClientError as e:
print "Activity already exists: ", e.response.get("Error", {}).get("Code")
我的工人代码:
#!/usr/bin/python
import boto3
from botocore.client import Config
botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)
print "Listening for Worker Tasks"
while True:
task = swf.poll_for_activity_task(
domain='surroundiotest-swf',
taskList={'name': 'testflow'},
identity='worker-1')
if 'taskToken' not in task:
print "Poll timed out, no new task. Repoll"
else:
print "New task arrived"
swf.respond_activity_task_completed(
taskToken=task['taskToken'],
result='success'
)
print "Task Done"
我的决策代码:
#!/usr/bin/python
import boto3
from botocore.client import Config
botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)
print "Listening for Decision Tasks"
while True:
newTask = swf.poll_for_decision_task(
domain='surroundiotest-swf',
taskList={'name': 'testflow'},
identity='decider-1',
reverseOrder=True)
if 'taskToken' not in newTask:
print "Poll timed out, no new task. Repoll"
elif 'events' in newTask:
eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')]
lastEvent = eventHistory[-1]
if lastEvent['eventType'] == 'WorkflowExecutionStarted':
print "Dispatching task to worker", newTask['workflowExecution'], newTask['workflowType']
swf.respond_decision_task_completed(
taskToken=newTask['taskToken'],
decisions=[
{
'decisionType': 'ScheduleActivityTask',
'scheduleActivityTaskDecisionAttributes': {
'activityType':{
'name': 'testworker',
'version': '0.1'
},
'activityId': 'activityid-1001',
'input': '',
'scheduleToCloseTimeout': 'NONE',
'scheduleToStartTimeout': 'NONE',
'startToCloseTimeout': 'NONE',
'heartbeatTimeout': 'NONE',
'taskList': {'name': 'testflow'},
}
}
]
)
print "Task Dispatched"
# print json.dumps(newTask, default=json_serial, sort_keys=True, indent=4, separators=(',', ': '))
elif lastEvent['eventType'] == 'ActivityTaskCompleted':
swf.respond_decision_task_completed(
taskToken=newTask['taskToken'],
decisions=[
{
'decisionType': 'CompleteWorkflowExecution',
'completeWorkflowExecutionDecisionAttributes': {
'result': 'success'
}
}
]
)
print "Task Completed!"
当我运行这些脚本,并提交请求时,decider接收任务并派发,worker接收任务并执行。但是,决策者会在每次后续轮询时收到任务通知,因此我从决策者那里看到这样的输出:
Listening for Decision Tasks
Poll timed out, no new task. Repoll
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'}
Task Dispatched
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'}
Task Dispatched
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'}
Task Dispatched
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'}
Task Dispatched
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'}
Task Dispatched
SWF 似乎没有收到决策者已分派任务的通知,而是不断将任务交付给决策者。我是否在某些决策程序设置上做错了,是否需要向 SWF 传达一些其他信息?
你有一个小错误。您正在使用 reverseOrder=True
轮询决策任务。来自 PollForDecisionTask 的 API 文档:
reverseOrder
When set totrue
, returns the events in reverse order. By default the results are returned in ascending order of theeventTimestamp
of the events.
在 reverseOrder=True
中,您最后获得最旧的事件,它始终是 WorkflowExecutionStarted
。任务完成后,你总是重新安排。
通常,您希望使用 previousStartedEventId
并仅处理在上一个决策任务之后发生的新事件。仅处理最后一个 activity 任务并不总是足够的,因为可能有多个事件要处理。