如何在 Amazon SWF hello_sample 示例中添加第二个 activity
How to add second activity in Amazon SWF hello_sample example
我已经成功实施了名为 hello_sample 的简单 Java Amazon SWF 示例。我创建了 ActivityWorker 可执行文件,它轮询 SWF 以查找 activity 个要处理的任务。我创建了 WorkflowWorker 可执行文件来轮询 SWF 以执行决策任务,并且我有一个 WorkflowStarter 可执行文件来启动工作流执行。它像宣传的那样工作。我不明白的是如何在第一个 activity?
之后配置和添加第二个 activity 到 运行
工作流工作者:
public class WorkflowWorker {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
public static void main(String[] args) {
PollForDecisionTaskRequest task_request =
new PollForDecisionTaskRequest()
.withDomain(Constants.DOMAIN)
.withTaskList(new TaskList().withName(Constants.TASKLIST));
while (true) {
System.out.println(
"WorkflowWorker is polling for a decision task from the tasklist '" +
Constants.TASKLIST + "' in the domain '" +
Constants.DOMAIN + "'.");
DecisionTask task = swf.pollForDecisionTask(task_request);
String taskToken = task.getTaskToken();
if (taskToken != null) {
try {
executeDecisionTask(taskToken, task.getEvents());
}
catch (Throwable th) {
th.printStackTrace();
}
}
}
}
private static void executeDecisionTask(String taskToken, List<HistoryEvent> events) throws Throwable {
List<Decision> decisions = new ArrayList<Decision>();
String workflow_input = null;
int scheduled_activities = 0;
int open_activities = 0;
boolean activity_completed = false;
String result = null;
System.out.println("WorkflowWorker is executing the decision task for the history events: [");
for (HistoryEvent event : events) {
System.out.println(" " + event);
switch(event.getEventType()) {
case "WorkflowExecutionStarted":
workflow_input = event.getWorkflowExecutionStartedEventAttributes().getInput();
break;
case "ActivityTaskScheduled":
scheduled_activities++;
break;
case "ScheduleActivityTaskFailed":
scheduled_activities--;
break;
case "ActivityTaskStarted":
scheduled_activities--;
open_activities++;
break;
case "ActivityTaskCompleted":
open_activities--;
activity_completed = true;
result = event.getActivityTaskCompletedEventAttributes().getResult();
break;
case "ActivityTaskFailed":
open_activities--;
break;
case "ActivityTaskTimedOut":
open_activities--;
break;
}
}
System.out.println("]");
if (activity_completed) {
decisions.add(
new Decision()
.withDecisionType(DecisionType.CompleteWorkflowExecution)
.withCompleteWorkflowExecutionDecisionAttributes(
new CompleteWorkflowExecutionDecisionAttributes()
.withResult(result)));
}
else {
if (open_activities == 0 && scheduled_activities == 0) {
ScheduleActivityTaskDecisionAttributes attrs =
new ScheduleActivityTaskDecisionAttributes()
.withActivityType(new ActivityType()
.withName(Constants.ACTIVITY)
.withVersion(Constants.ACTIVITY_VERSION))
.withActivityId(UUID.randomUUID().toString())
.withInput(workflow_input);
decisions.add(
new Decision()
.withDecisionType(DecisionType.ScheduleActivityTask)
.withScheduleActivityTaskDecisionAttributes(attrs));
}
else {
// an instance of HelloActivity is already scheduled or running. Do nothing, another
// task will be scheduled once the activity completes, fails or times out
}
}
System.out.println("WorkflowWorker is exiting the decision task with the decisions " + decisions);
swf.respondDecisionTaskCompleted(
new RespondDecisionTaskCompletedRequest()
.withTaskToken(taskToken)
.withDecisions(decisions));
}
}
ActivityWorker:
public class ActivityWorker {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
private static CountDownLatch waitForTermination = new CountDownLatch(1);
private static volatile boolean terminate = false;
private static String executeActivityTask(String g_species) throws Throwable {
String s = " ******** Hello, " + g_species + "!";
System.out.println(s);
String cwd = Paths.get(".").toAbsolutePath().normalize().toString();
String filename = "g_species.txt";
Path filePath = Paths.get(cwd, filename);
String filePathName = filePath.toString();
BufferedWriter output = null;
try {
File file = new File (filePathName);
output = new BufferedWriter(new FileWriter(file));
output.write(g_species);
}
catch (IOException e) {
e.printStackTrace();
}
finally {
if (output != null) {
output.close();
}
}
return g_species;
}
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
terminate = true;
System.out.println("ActivityWorker is waiting for the current poll request to return before shutting down.");
waitForTermination.await(60, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
// ignore
System.out.println(e.getMessage());
}
}
});
try {
pollAndExecute();
}
finally {
waitForTermination.countDown();
}
}
public static void pollAndExecute() {
while (!terminate) {
System.out.println("ActivityWorker is polling for an activity task from the tasklist '"
+ Constants.TASKLIST + "' in the domain '" + Constants.DOMAIN + "'.");
ActivityTask task = swf.pollForActivityTask(new PollForActivityTaskRequest()
.withDomain(Constants.DOMAIN)
.withTaskList(new TaskList().withName(Constants.TASKLIST)));
String taskToken = task.getTaskToken();
if (taskToken != null) {
String result = null;
Throwable error = null;
try {
System.out.println("ActivityWorker is executing the activity task with input '" + task.getInput() + "'.");
result = executeActivityTask(task.getInput());
}
catch (Throwable th) {
error = th;
}
if (error == null) {
System.out.println("The activity task succeeded with result '" + result + "'.");
swf.respondActivityTaskCompleted(
new RespondActivityTaskCompletedRequest()
.withTaskToken(taskToken)
.withResult(result));
}
else {
System.out.println("The activity task failed with the error '"
+ error.getClass().getSimpleName() + "'.");
swf.respondActivityTaskFailed(
new RespondActivityTaskFailedRequest()
.withTaskToken(taskToken)
.withReason(error.getClass().getSimpleName())
.withDetails(error.getMessage()));
}
}
}
}
}
启动这一切的 WorkflowStarter:
public class WorkflowStarter {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
public static final String WORKFLOW_EXECUTION = "HelloWorldWorkflowExecution";
public static void main(String[] args) {
String workflow_input = "Amazon SWF";
if (args.length > 0) {
workflow_input = args[0];
}
System.out.println("Starting the workflow execution '" + WORKFLOW_EXECUTION +
"' with input '" + workflow_input + "'.");
WorkflowType wf_type = new WorkflowType()
.withName(Constants.WORKFLOW)
.withVersion(Constants.WORKFLOW_VERSION);
Run run = swf.startWorkflowExecution(new StartWorkflowExecutionRequest()
.withDomain(Constants.DOMAIN)
.withWorkflowType(wf_type)
.withWorkflowId(WORKFLOW_EXECUTION)
.withInput(workflow_input)
.withExecutionStartToCloseTimeout("90"));
System.out.println("Workflow execution started with the run id '" +
run.getRunId() + "'.");
}
}
我建议不要重新发明轮子,使用亚马逊官方支持的AWS Flow Framework for Java。它已经实现了所有底层细节,并允许您直接关注工作流的业务逻辑。
这是一个使用三个活动的示例工作流(取自 developer guide)。
活动界面:
import com.amazonaws.services.simpleworkflow.flow.annotations.Activities;
import com.amazonaws.services.simpleworkflow.flow.annotations.ActivityRegistrationOptions;
@ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = 300,
defaultTaskStartToCloseTimeoutSeconds = 10)
@Activities(version="1.0")
public interface GreeterActivities {
public String getName();
public String getGreeting(String name);
public void say(String what);
}
活动实施:
public class GreeterActivitiesImpl implements GreeterActivities {
@Override
public String getName() {
return "World";
}
@Override
public String getGreeting(String name) {
return "Hello " + name;
}
@Override
public void say(String what) {
System.out.println(what);
}
}
工作流界面:
import com.amazonaws.services.simpleworkflow.flow.annotations.Execute;
import com.amazonaws.services.simpleworkflow.flow.annotations.Workflow;
import com.amazonaws.services.simpleworkflow.flow.annotations.WorkflowRegistrationOptions;
@Workflow
@WorkflowRegistrationOptions(defaultExecutionStartToCloseTimeoutSeconds = 3600)
public interface GreeterWorkflow {
@Execute(version = "1.0")
public void greet();
}
工作流程实施:
import com.amazonaws.services.simpleworkflow.flow.core.Promise;
public class GreeterWorkflowImpl implements GreeterWorkflow {
private GreeterActivitiesClient operations = new GreeterActivitiesClientImpl();
public void greet() {
Promise<String> name = operations.getName();
Promise<String> greeting = operations.getGreeting(name);
operations.say(greeting);
}
}
托管他们两个的工作人员。显然它可以分解成单独的 activity 和 workflow workers:
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;
import com.amazonaws.services.simpleworkflow.flow.ActivityWorker;
import com.amazonaws.services.simpleworkflow.flow.WorkflowWorker;
public class GreeterWorker {
public static void main(String[] args) throws Exception {
ClientConfiguration config = new ClientConfiguration().withSocketTimeout(70*1000);
String swfAccessId = System.getenv("AWS_ACCESS_KEY_ID");
String swfSecretKey = System.getenv("AWS_SECRET_KEY");
AWSCredentials awsCredentials = new BasicAWSCredentials(swfAccessId, swfSecretKey);
AmazonSimpleWorkflow service = new AmazonSimpleWorkflowClient(awsCredentials, config);
service.setEndpoint("https://swf.us-east-1.amazonaws.com");
String domain = "helloWorldWalkthrough";
String taskListToPoll = "HelloWorldList";
ActivityWorker aw = new ActivityWorker(service, domain, taskListToPoll);
aw.addActivitiesImplementation(new GreeterActivitiesImpl());
aw.start();
WorkflowWorker wfw = new WorkflowWorker(service, domain, taskListToPoll);
wfw.addWorkflowImplementationType(GreeterWorkflowImpl.class);
wfw.start();
}
}
工作流启动器:
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;
public class GreeterMain {
public static void main(String[] args) throws Exception {
ClientConfiguration config = new ClientConfiguration().withSocketTimeout(70*1000);
String swfAccessId = System.getenv("AWS_ACCESS_KEY_ID");
String swfSecretKey = System.getenv("AWS_SECRET_KEY");
AWSCredentials awsCredentials = new BasicAWSCredentials(swfAccessId, swfSecretKey);
AmazonSimpleWorkflow service = new AmazonSimpleWorkflowClient(awsCredentials, config);
service.setEndpoint("https://swf.us-east-1.amazonaws.com");
String domain = "helloWorldWalkthrough";
GreeterWorkflowClientExternalFactory factory = new GreeterWorkflowClientExternalFactoryImpl(service, domain);
GreeterWorkflowClientExternal greeter = factory.getClient("someID");
greeter.greet();
}
}
我已经成功实施了名为 hello_sample 的简单 Java Amazon SWF 示例。我创建了 ActivityWorker 可执行文件,它轮询 SWF 以查找 activity 个要处理的任务。我创建了 WorkflowWorker 可执行文件来轮询 SWF 以执行决策任务,并且我有一个 WorkflowStarter 可执行文件来启动工作流执行。它像宣传的那样工作。我不明白的是如何在第一个 activity?
之后配置和添加第二个 activity 到 运行
工作流工作者:
public class WorkflowWorker {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
public static void main(String[] args) {
PollForDecisionTaskRequest task_request =
new PollForDecisionTaskRequest()
.withDomain(Constants.DOMAIN)
.withTaskList(new TaskList().withName(Constants.TASKLIST));
while (true) {
System.out.println(
"WorkflowWorker is polling for a decision task from the tasklist '" +
Constants.TASKLIST + "' in the domain '" +
Constants.DOMAIN + "'.");
DecisionTask task = swf.pollForDecisionTask(task_request);
String taskToken = task.getTaskToken();
if (taskToken != null) {
try {
executeDecisionTask(taskToken, task.getEvents());
}
catch (Throwable th) {
th.printStackTrace();
}
}
}
}
private static void executeDecisionTask(String taskToken, List<HistoryEvent> events) throws Throwable {
List<Decision> decisions = new ArrayList<Decision>();
String workflow_input = null;
int scheduled_activities = 0;
int open_activities = 0;
boolean activity_completed = false;
String result = null;
System.out.println("WorkflowWorker is executing the decision task for the history events: [");
for (HistoryEvent event : events) {
System.out.println(" " + event);
switch(event.getEventType()) {
case "WorkflowExecutionStarted":
workflow_input = event.getWorkflowExecutionStartedEventAttributes().getInput();
break;
case "ActivityTaskScheduled":
scheduled_activities++;
break;
case "ScheduleActivityTaskFailed":
scheduled_activities--;
break;
case "ActivityTaskStarted":
scheduled_activities--;
open_activities++;
break;
case "ActivityTaskCompleted":
open_activities--;
activity_completed = true;
result = event.getActivityTaskCompletedEventAttributes().getResult();
break;
case "ActivityTaskFailed":
open_activities--;
break;
case "ActivityTaskTimedOut":
open_activities--;
break;
}
}
System.out.println("]");
if (activity_completed) {
decisions.add(
new Decision()
.withDecisionType(DecisionType.CompleteWorkflowExecution)
.withCompleteWorkflowExecutionDecisionAttributes(
new CompleteWorkflowExecutionDecisionAttributes()
.withResult(result)));
}
else {
if (open_activities == 0 && scheduled_activities == 0) {
ScheduleActivityTaskDecisionAttributes attrs =
new ScheduleActivityTaskDecisionAttributes()
.withActivityType(new ActivityType()
.withName(Constants.ACTIVITY)
.withVersion(Constants.ACTIVITY_VERSION))
.withActivityId(UUID.randomUUID().toString())
.withInput(workflow_input);
decisions.add(
new Decision()
.withDecisionType(DecisionType.ScheduleActivityTask)
.withScheduleActivityTaskDecisionAttributes(attrs));
}
else {
// an instance of HelloActivity is already scheduled or running. Do nothing, another
// task will be scheduled once the activity completes, fails or times out
}
}
System.out.println("WorkflowWorker is exiting the decision task with the decisions " + decisions);
swf.respondDecisionTaskCompleted(
new RespondDecisionTaskCompletedRequest()
.withTaskToken(taskToken)
.withDecisions(decisions));
}
}
ActivityWorker:
public class ActivityWorker {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
private static CountDownLatch waitForTermination = new CountDownLatch(1);
private static volatile boolean terminate = false;
private static String executeActivityTask(String g_species) throws Throwable {
String s = " ******** Hello, " + g_species + "!";
System.out.println(s);
String cwd = Paths.get(".").toAbsolutePath().normalize().toString();
String filename = "g_species.txt";
Path filePath = Paths.get(cwd, filename);
String filePathName = filePath.toString();
BufferedWriter output = null;
try {
File file = new File (filePathName);
output = new BufferedWriter(new FileWriter(file));
output.write(g_species);
}
catch (IOException e) {
e.printStackTrace();
}
finally {
if (output != null) {
output.close();
}
}
return g_species;
}
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
terminate = true;
System.out.println("ActivityWorker is waiting for the current poll request to return before shutting down.");
waitForTermination.await(60, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
// ignore
System.out.println(e.getMessage());
}
}
});
try {
pollAndExecute();
}
finally {
waitForTermination.countDown();
}
}
public static void pollAndExecute() {
while (!terminate) {
System.out.println("ActivityWorker is polling for an activity task from the tasklist '"
+ Constants.TASKLIST + "' in the domain '" + Constants.DOMAIN + "'.");
ActivityTask task = swf.pollForActivityTask(new PollForActivityTaskRequest()
.withDomain(Constants.DOMAIN)
.withTaskList(new TaskList().withName(Constants.TASKLIST)));
String taskToken = task.getTaskToken();
if (taskToken != null) {
String result = null;
Throwable error = null;
try {
System.out.println("ActivityWorker is executing the activity task with input '" + task.getInput() + "'.");
result = executeActivityTask(task.getInput());
}
catch (Throwable th) {
error = th;
}
if (error == null) {
System.out.println("The activity task succeeded with result '" + result + "'.");
swf.respondActivityTaskCompleted(
new RespondActivityTaskCompletedRequest()
.withTaskToken(taskToken)
.withResult(result));
}
else {
System.out.println("The activity task failed with the error '"
+ error.getClass().getSimpleName() + "'.");
swf.respondActivityTaskFailed(
new RespondActivityTaskFailedRequest()
.withTaskToken(taskToken)
.withReason(error.getClass().getSimpleName())
.withDetails(error.getMessage()));
}
}
}
}
}
启动这一切的 WorkflowStarter:
public class WorkflowStarter {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
public static final String WORKFLOW_EXECUTION = "HelloWorldWorkflowExecution";
public static void main(String[] args) {
String workflow_input = "Amazon SWF";
if (args.length > 0) {
workflow_input = args[0];
}
System.out.println("Starting the workflow execution '" + WORKFLOW_EXECUTION +
"' with input '" + workflow_input + "'.");
WorkflowType wf_type = new WorkflowType()
.withName(Constants.WORKFLOW)
.withVersion(Constants.WORKFLOW_VERSION);
Run run = swf.startWorkflowExecution(new StartWorkflowExecutionRequest()
.withDomain(Constants.DOMAIN)
.withWorkflowType(wf_type)
.withWorkflowId(WORKFLOW_EXECUTION)
.withInput(workflow_input)
.withExecutionStartToCloseTimeout("90"));
System.out.println("Workflow execution started with the run id '" +
run.getRunId() + "'.");
}
}
我建议不要重新发明轮子,使用亚马逊官方支持的AWS Flow Framework for Java。它已经实现了所有底层细节,并允许您直接关注工作流的业务逻辑。
这是一个使用三个活动的示例工作流(取自 developer guide)。
活动界面:
import com.amazonaws.services.simpleworkflow.flow.annotations.Activities;
import com.amazonaws.services.simpleworkflow.flow.annotations.ActivityRegistrationOptions;
@ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = 300,
defaultTaskStartToCloseTimeoutSeconds = 10)
@Activities(version="1.0")
public interface GreeterActivities {
public String getName();
public String getGreeting(String name);
public void say(String what);
}
活动实施:
public class GreeterActivitiesImpl implements GreeterActivities {
@Override
public String getName() {
return "World";
}
@Override
public String getGreeting(String name) {
return "Hello " + name;
}
@Override
public void say(String what) {
System.out.println(what);
}
}
工作流界面:
import com.amazonaws.services.simpleworkflow.flow.annotations.Execute;
import com.amazonaws.services.simpleworkflow.flow.annotations.Workflow;
import com.amazonaws.services.simpleworkflow.flow.annotations.WorkflowRegistrationOptions;
@Workflow
@WorkflowRegistrationOptions(defaultExecutionStartToCloseTimeoutSeconds = 3600)
public interface GreeterWorkflow {
@Execute(version = "1.0")
public void greet();
}
工作流程实施:
import com.amazonaws.services.simpleworkflow.flow.core.Promise;
public class GreeterWorkflowImpl implements GreeterWorkflow {
private GreeterActivitiesClient operations = new GreeterActivitiesClientImpl();
public void greet() {
Promise<String> name = operations.getName();
Promise<String> greeting = operations.getGreeting(name);
operations.say(greeting);
}
}
托管他们两个的工作人员。显然它可以分解成单独的 activity 和 workflow workers:
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;
import com.amazonaws.services.simpleworkflow.flow.ActivityWorker;
import com.amazonaws.services.simpleworkflow.flow.WorkflowWorker;
public class GreeterWorker {
public static void main(String[] args) throws Exception {
ClientConfiguration config = new ClientConfiguration().withSocketTimeout(70*1000);
String swfAccessId = System.getenv("AWS_ACCESS_KEY_ID");
String swfSecretKey = System.getenv("AWS_SECRET_KEY");
AWSCredentials awsCredentials = new BasicAWSCredentials(swfAccessId, swfSecretKey);
AmazonSimpleWorkflow service = new AmazonSimpleWorkflowClient(awsCredentials, config);
service.setEndpoint("https://swf.us-east-1.amazonaws.com");
String domain = "helloWorldWalkthrough";
String taskListToPoll = "HelloWorldList";
ActivityWorker aw = new ActivityWorker(service, domain, taskListToPoll);
aw.addActivitiesImplementation(new GreeterActivitiesImpl());
aw.start();
WorkflowWorker wfw = new WorkflowWorker(service, domain, taskListToPoll);
wfw.addWorkflowImplementationType(GreeterWorkflowImpl.class);
wfw.start();
}
}
工作流启动器:
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;
public class GreeterMain {
public static void main(String[] args) throws Exception {
ClientConfiguration config = new ClientConfiguration().withSocketTimeout(70*1000);
String swfAccessId = System.getenv("AWS_ACCESS_KEY_ID");
String swfSecretKey = System.getenv("AWS_SECRET_KEY");
AWSCredentials awsCredentials = new BasicAWSCredentials(swfAccessId, swfSecretKey);
AmazonSimpleWorkflow service = new AmazonSimpleWorkflowClient(awsCredentials, config);
service.setEndpoint("https://swf.us-east-1.amazonaws.com");
String domain = "helloWorldWalkthrough";
GreeterWorkflowClientExternalFactory factory = new GreeterWorkflowClientExternalFactoryImpl(service, domain);
GreeterWorkflowClientExternal greeter = factory.getClient("someID");
greeter.greet();
}
}