如何在 Amazon SWF hello_sample 示例中添加第二个 activity

How to add second activity in Amazon SWF hello_sample example

我已经成功实施了名为 hello_sample 的简单 Java Amazon SWF 示例。我创建了 ActivityWorker 可执行文件,它轮询 SWF 以查找 activity 个要处理的任务。我创建了 WorkflowWorker 可执行文件来轮询 SWF 以执行决策任务,并且我有一个 WorkflowStarter 可执行文件来启动工作流执行。它像宣传的那样工作。我不明白的是如何在第一个 activity?
之后配置和添加第二个 activity 到 运行 工作流工作者:

public class WorkflowWorker {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
public static void main(String[] args) {
    PollForDecisionTaskRequest task_request =
        new PollForDecisionTaskRequest()
            .withDomain(Constants.DOMAIN)
            .withTaskList(new TaskList().withName(Constants.TASKLIST));

    while (true) {
        System.out.println(
                "WorkflowWorker is polling for a decision task from the tasklist '" +
                Constants.TASKLIST + "' in the domain '" +
                Constants.DOMAIN + "'.");

        DecisionTask task = swf.pollForDecisionTask(task_request);

        String taskToken = task.getTaskToken();
        if (taskToken != null) {
            try {
                executeDecisionTask(taskToken, task.getEvents());
            }
            catch (Throwable th) {
                th.printStackTrace();
            }
        }
    }
}

private static void executeDecisionTask(String taskToken, List<HistoryEvent> events) throws Throwable {
    List<Decision> decisions = new ArrayList<Decision>();
    String workflow_input = null;
    int scheduled_activities = 0;
    int open_activities = 0;
    boolean activity_completed = false;
    String result = null;

    System.out.println("WorkflowWorker is executing the decision task for the history events: [");
    for (HistoryEvent event : events) {
        System.out.println("  " + event);
        switch(event.getEventType()) {
            case "WorkflowExecutionStarted":
                workflow_input = event.getWorkflowExecutionStartedEventAttributes().getInput();
                break;
            case "ActivityTaskScheduled":
                scheduled_activities++;
                break;
            case "ScheduleActivityTaskFailed":
                scheduled_activities--;
                break;
            case "ActivityTaskStarted":
                scheduled_activities--;
                open_activities++;
                break;
            case "ActivityTaskCompleted":
                open_activities--;
                activity_completed = true;
                result = event.getActivityTaskCompletedEventAttributes().getResult();
                break;
            case "ActivityTaskFailed":
                open_activities--;
                break;
            case "ActivityTaskTimedOut":
                open_activities--;
                break;
        }
    }
    System.out.println("]");

    if (activity_completed) {
        decisions.add(
            new Decision()
                .withDecisionType(DecisionType.CompleteWorkflowExecution)
                .withCompleteWorkflowExecutionDecisionAttributes(
                    new CompleteWorkflowExecutionDecisionAttributes()
                        .withResult(result)));
    }
    else {
        if (open_activities == 0 && scheduled_activities == 0) {
            ScheduleActivityTaskDecisionAttributes attrs =
                new ScheduleActivityTaskDecisionAttributes()
                    .withActivityType(new ActivityType()
                        .withName(Constants.ACTIVITY)
                        .withVersion(Constants.ACTIVITY_VERSION))
                    .withActivityId(UUID.randomUUID().toString())
                    .withInput(workflow_input);

            decisions.add(
                    new Decision()
                        .withDecisionType(DecisionType.ScheduleActivityTask)
                        .withScheduleActivityTaskDecisionAttributes(attrs));
        }
        else {
            // an instance of HelloActivity is already scheduled or running. Do nothing, another
            // task will be scheduled once the activity completes, fails or times out
        }
    }

    System.out.println("WorkflowWorker is exiting the decision task with the decisions " + decisions);
    swf.respondDecisionTaskCompleted(
        new RespondDecisionTaskCompletedRequest()
            .withTaskToken(taskToken)
            .withDecisions(decisions));
}

}

ActivityWorker:

public class ActivityWorker {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
private static CountDownLatch waitForTermination = new CountDownLatch(1);
private static volatile boolean terminate = false;

private static String executeActivityTask(String g_species) throws Throwable {
    String s = "   ********   Hello, " + g_species + "!";
    System.out.println(s);

    String cwd = Paths.get(".").toAbsolutePath().normalize().toString();
    String filename = "g_species.txt";
    Path filePath = Paths.get(cwd, filename);
    String filePathName = filePath.toString();

    BufferedWriter output = null;
    try {
        File file = new File (filePathName);
        output = new BufferedWriter(new FileWriter(file));
        output.write(g_species);
    } 
    catch (IOException e) {
        e.printStackTrace();
    } 
    finally {
      if (output != null) {
        output.close();
      }
    }

    return g_species;
}

public static void main(String[] args) {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            try {
                terminate = true;
                System.out.println("ActivityWorker is waiting for the current poll request to return before shutting down.");
                waitForTermination.await(60, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                // ignore
                System.out.println(e.getMessage());
            }
        }
    });
    try {
        pollAndExecute();
    }
    finally {
        waitForTermination.countDown();
    }
}

public static void pollAndExecute() {
    while (!terminate) {
        System.out.println("ActivityWorker is polling for an activity task from the tasklist '"
                + Constants.TASKLIST + "' in the domain '" + Constants.DOMAIN + "'.");

        ActivityTask task = swf.pollForActivityTask(new PollForActivityTaskRequest()
            .withDomain(Constants.DOMAIN)
            .withTaskList(new TaskList().withName(Constants.TASKLIST)));

        String taskToken = task.getTaskToken();

        if (taskToken != null) {
            String result = null;
            Throwable error = null;

            try {
                System.out.println("ActivityWorker is executing the activity task with input '" + task.getInput() + "'.");
                result = executeActivityTask(task.getInput());
            }
            catch (Throwable th) {
                error = th;
            }

            if (error == null) {
                System.out.println("The activity task succeeded with result '" + result + "'.");
                swf.respondActivityTaskCompleted(
                    new RespondActivityTaskCompletedRequest()
                        .withTaskToken(taskToken)
                        .withResult(result));
            }
            else {
                System.out.println("The activity task failed with the error '"
                        + error.getClass().getSimpleName() + "'.");
                swf.respondActivityTaskFailed(
                    new RespondActivityTaskFailedRequest()
                        .withTaskToken(taskToken)
                        .withReason(error.getClass().getSimpleName())
                        .withDetails(error.getMessage()));
            }
        }
    }
}

}

启动这一切的 WorkflowStarter:

public class WorkflowStarter {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
public static final String WORKFLOW_EXECUTION = "HelloWorldWorkflowExecution";

public static void main(String[] args) {

    String workflow_input = "Amazon SWF";
    if (args.length > 0) {
        workflow_input = args[0];
    }

    System.out.println("Starting the workflow execution '" + WORKFLOW_EXECUTION +
            "' with input '" + workflow_input + "'.");

    WorkflowType wf_type = new WorkflowType()
        .withName(Constants.WORKFLOW)
        .withVersion(Constants.WORKFLOW_VERSION);

    Run run = swf.startWorkflowExecution(new StartWorkflowExecutionRequest()
        .withDomain(Constants.DOMAIN)
        .withWorkflowType(wf_type)
        .withWorkflowId(WORKFLOW_EXECUTION)
        .withInput(workflow_input)
        .withExecutionStartToCloseTimeout("90"));

    System.out.println("Workflow execution started with the run id '" +
            run.getRunId() + "'.");
}

}

我建议不要重新发明轮子,使用亚马逊官方支持的AWS Flow Framework for Java。它已经实现了所有底层细节,并允许您直接关注工作流的业务逻辑。

这是一个使用三个活动的示例工作流(取自 developer guide)。

活动界面:

import com.amazonaws.services.simpleworkflow.flow.annotations.Activities;
import com.amazonaws.services.simpleworkflow.flow.annotations.ActivityRegistrationOptions;

@ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = 300,
                             defaultTaskStartToCloseTimeoutSeconds = 10)
@Activities(version="1.0")

public interface GreeterActivities {
   public String getName();
   public String getGreeting(String name);
   public void say(String what);
}

活动实施:

public class GreeterActivitiesImpl implements GreeterActivities {
   @Override
   public String getName() {
      return "World";
   }
   @Override
   public String getGreeting(String name) {
      return "Hello " + name;
   }
   @Override
   public void say(String what) {
      System.out.println(what);
   }
}

工作流界面:

import com.amazonaws.services.simpleworkflow.flow.annotations.Execute;
import com.amazonaws.services.simpleworkflow.flow.annotations.Workflow;
import com.amazonaws.services.simpleworkflow.flow.annotations.WorkflowRegistrationOptions;

@Workflow
@WorkflowRegistrationOptions(defaultExecutionStartToCloseTimeoutSeconds = 3600)
public interface GreeterWorkflow {
   @Execute(version = "1.0")
   public void greet();
}

工作流程实施:

import com.amazonaws.services.simpleworkflow.flow.core.Promise;

public class GreeterWorkflowImpl implements GreeterWorkflow {
   private GreeterActivitiesClient operations = new GreeterActivitiesClientImpl();

   public void greet() {
     Promise<String> name = operations.getName();
     Promise<String> greeting = operations.getGreeting(name);
     operations.say(greeting);
   }
}

托管他们两个的工作人员。显然它可以分解成单独的 activity 和 workflow workers:

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;
import com.amazonaws.services.simpleworkflow.flow.ActivityWorker;
import com.amazonaws.services.simpleworkflow.flow.WorkflowWorker;

public class GreeterWorker  {
   public static void main(String[] args) throws Exception {
     ClientConfiguration config = new ClientConfiguration().withSocketTimeout(70*1000);

     String swfAccessId = System.getenv("AWS_ACCESS_KEY_ID");
     String swfSecretKey = System.getenv("AWS_SECRET_KEY");
     AWSCredentials awsCredentials = new BasicAWSCredentials(swfAccessId, swfSecretKey);

     AmazonSimpleWorkflow service = new AmazonSimpleWorkflowClient(awsCredentials, config);
     service.setEndpoint("https://swf.us-east-1.amazonaws.com");

     String domain = "helloWorldWalkthrough";
     String taskListToPoll = "HelloWorldList";

     ActivityWorker aw = new ActivityWorker(service, domain, taskListToPoll);
     aw.addActivitiesImplementation(new GreeterActivitiesImpl());
     aw.start();

     WorkflowWorker wfw = new WorkflowWorker(service, domain, taskListToPoll);
     wfw.addWorkflowImplementationType(GreeterWorkflowImpl.class);
     wfw.start();
   }
}

工作流启动器:

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;

public class GreeterMain {

   public static void main(String[] args) throws Exception {
     ClientConfiguration config = new ClientConfiguration().withSocketTimeout(70*1000);

     String swfAccessId = System.getenv("AWS_ACCESS_KEY_ID");
     String swfSecretKey = System.getenv("AWS_SECRET_KEY");
     AWSCredentials awsCredentials = new BasicAWSCredentials(swfAccessId, swfSecretKey);

     AmazonSimpleWorkflow service = new AmazonSimpleWorkflowClient(awsCredentials, config);
     service.setEndpoint("https://swf.us-east-1.amazonaws.com");

     String domain = "helloWorldWalkthrough";

     GreeterWorkflowClientExternalFactory factory = new GreeterWorkflowClientExternalFactoryImpl(service, domain);
     GreeterWorkflowClientExternal greeter = factory.getClient("someID");
     greeter.greet();
   }
}