gpt4 book ai didi

java - 如何在 Amazon SWF hello_sample 示例中添加第二个 Activity

转载 作者:行者123 更新时间:2023-12-01 09:11:09 25 4
gpt4 key购买 nike

我已成功实现了名为 hello_sample 的简单 Java Amazon SWF 示例。我创建了 ActivityWorker 可执行文件,用于轮询 SWF 以查找要处理的 Activity 任务。我创建了 WorkflowWorker 可执行文件,用于轮询 SWF 以确定任务,并且我有一个 WorkflowStarter 可执行文件,用于启动工作流执行。它的作用正如广告所宣传的那样。我不明白的是如何配置和添加第二个 Activity 以在第一个 Activity 之后运行?
工作流程 worker :

public class WorkflowWorker {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
public static void main(String[] args) {
PollForDecisionTaskRequest task_request =
new PollForDecisionTaskRequest()
.withDomain(Constants.DOMAIN)
.withTaskList(new TaskList().withName(Constants.TASKLIST));

while (true) {
System.out.println(
"WorkflowWorker is polling for a decision task from the tasklist '" +
Constants.TASKLIST + "' in the domain '" +
Constants.DOMAIN + "'.");

DecisionTask task = swf.pollForDecisionTask(task_request);

String taskToken = task.getTaskToken();
if (taskToken != null) {
try {
executeDecisionTask(taskToken, task.getEvents());
}
catch (Throwable th) {
th.printStackTrace();
}
}
}
}

private static void executeDecisionTask(String taskToken, List<HistoryEvent> events) throws Throwable {
List<Decision> decisions = new ArrayList<Decision>();
String workflow_input = null;
int scheduled_activities = 0;
int open_activities = 0;
boolean activity_completed = false;
String result = null;

System.out.println("WorkflowWorker is executing the decision task for the history events: [");
for (HistoryEvent event : events) {
System.out.println(" " + event);
switch(event.getEventType()) {
case "WorkflowExecutionStarted":
workflow_input = event.getWorkflowExecutionStartedEventAttributes().getInput();
break;
case "ActivityTaskScheduled":
scheduled_activities++;
break;
case "ScheduleActivityTaskFailed":
scheduled_activities--;
break;
case "ActivityTaskStarted":
scheduled_activities--;
open_activities++;
break;
case "ActivityTaskCompleted":
open_activities--;
activity_completed = true;
result = event.getActivityTaskCompletedEventAttributes().getResult();
break;
case "ActivityTaskFailed":
open_activities--;
break;
case "ActivityTaskTimedOut":
open_activities--;
break;
}
}
System.out.println("]");

if (activity_completed) {
decisions.add(
new Decision()
.withDecisionType(DecisionType.CompleteWorkflowExecution)
.withCompleteWorkflowExecutionDecisionAttributes(
new CompleteWorkflowExecutionDecisionAttributes()
.withResult(result)));
}
else {
if (open_activities == 0 && scheduled_activities == 0) {
ScheduleActivityTaskDecisionAttributes attrs =
new ScheduleActivityTaskDecisionAttributes()
.withActivityType(new ActivityType()
.withName(Constants.ACTIVITY)
.withVersion(Constants.ACTIVITY_VERSION))
.withActivityId(UUID.randomUUID().toString())
.withInput(workflow_input);

decisions.add(
new Decision()
.withDecisionType(DecisionType.ScheduleActivityTask)
.withScheduleActivityTaskDecisionAttributes(attrs));
}
else {
// an instance of HelloActivity is already scheduled or running. Do nothing, another
// task will be scheduled once the activity completes, fails or times out
}
}

System.out.println("WorkflowWorker is exiting the decision task with the decisions " + decisions);
swf.respondDecisionTaskCompleted(
new RespondDecisionTaskCompletedRequest()
.withTaskToken(taskToken)
.withDecisions(decisions));
}

}

Activity 工作人员:

public class ActivityWorker {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
private static CountDownLatch waitForTermination = new CountDownLatch(1);
private static volatile boolean terminate = false;

private static String executeActivityTask(String g_species) throws Throwable {
String s = " ******** Hello, " + g_species + "!";
System.out.println(s);

String cwd = Paths.get(".").toAbsolutePath().normalize().toString();
String filename = "g_species.txt";
Path filePath = Paths.get(cwd, filename);
String filePathName = filePath.toString();

BufferedWriter output = null;
try {
File file = new File (filePathName);
output = new BufferedWriter(new FileWriter(file));
output.write(g_species);
}
catch (IOException e) {
e.printStackTrace();
}
finally {
if (output != null) {
output.close();
}
}

return g_species;
}

public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
terminate = true;
System.out.println("ActivityWorker is waiting for the current poll request to return before shutting down.");
waitForTermination.await(60, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
// ignore
System.out.println(e.getMessage());
}
}
});
try {
pollAndExecute();
}
finally {
waitForTermination.countDown();
}
}

public static void pollAndExecute() {
while (!terminate) {
System.out.println("ActivityWorker is polling for an activity task from the tasklist '"
+ Constants.TASKLIST + "' in the domain '" + Constants.DOMAIN + "'.");

ActivityTask task = swf.pollForActivityTask(new PollForActivityTaskRequest()
.withDomain(Constants.DOMAIN)
.withTaskList(new TaskList().withName(Constants.TASKLIST)));

String taskToken = task.getTaskToken();

if (taskToken != null) {
String result = null;
Throwable error = null;

try {
System.out.println("ActivityWorker is executing the activity task with input '" + task.getInput() + "'.");
result = executeActivityTask(task.getInput());
}
catch (Throwable th) {
error = th;
}

if (error == null) {
System.out.println("The activity task succeeded with result '" + result + "'.");
swf.respondActivityTaskCompleted(
new RespondActivityTaskCompletedRequest()
.withTaskToken(taskToken)
.withResult(result));
}
else {
System.out.println("The activity task failed with the error '"
+ error.getClass().getSimpleName() + "'.");
swf.respondActivityTaskFailed(
new RespondActivityTaskFailedRequest()
.withTaskToken(taskToken)
.withReason(error.getClass().getSimpleName())
.withDetails(error.getMessage()));
}
}
}
}

}

WorkflowStarter 启动这一切:

public class WorkflowStarter {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
public static final String WORKFLOW_EXECUTION = "HelloWorldWorkflowExecution";

public static void main(String[] args) {

String workflow_input = "Amazon SWF";
if (args.length > 0) {
workflow_input = args[0];
}

System.out.println("Starting the workflow execution '" + WORKFLOW_EXECUTION +
"' with input '" + workflow_input + "'.");

WorkflowType wf_type = new WorkflowType()
.withName(Constants.WORKFLOW)
.withVersion(Constants.WORKFLOW_VERSION);

Run run = swf.startWorkflowExecution(new StartWorkflowExecutionRequest()
.withDomain(Constants.DOMAIN)
.withWorkflowType(wf_type)
.withWorkflowId(WORKFLOW_EXECUTION)
.withInput(workflow_input)
.withExecutionStartToCloseTimeout("90"));

System.out.println("Workflow execution started with the run id '" +
run.getRunId() + "'.");
}

}

最佳答案

我建议不要重新发明轮子并使用 AWS Flow Framework for Java这是亚马逊官方支持的。它已经实现了所有底层细节,并允许您直接关注工作流程的业务逻辑。

这是一个使用三个 Activity 的示例工作流程(取自 developer guide )。

Activity 界面:

import com.amazonaws.services.simpleworkflow.flow.annotations.Activities;
import com.amazonaws.services.simpleworkflow.flow.annotations.ActivityRegistrationOptions;

@ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = 300,
defaultTaskStartToCloseTimeoutSeconds = 10)
@Activities(version="1.0")

public interface GreeterActivities {
public String getName();
public String getGreeting(String name);
public void say(String what);
}

Activity 实现:

public class GreeterActivitiesImpl implements GreeterActivities {
@Override
public String getName() {
return "World";
}
@Override
public String getGreeting(String name) {
return "Hello " + name;
}
@Override
public void say(String what) {
System.out.println(what);
}
}

工作流程界面:

import com.amazonaws.services.simpleworkflow.flow.annotations.Execute;
import com.amazonaws.services.simpleworkflow.flow.annotations.Workflow;
import com.amazonaws.services.simpleworkflow.flow.annotations.WorkflowRegistrationOptions;

@Workflow
@WorkflowRegistrationOptions(defaultExecutionStartToCloseTimeoutSeconds = 3600)
public interface GreeterWorkflow {
@Execute(version = "1.0")
public void greet();
}

工作流程实现:

import com.amazonaws.services.simpleworkflow.flow.core.Promise;

public class GreeterWorkflowImpl implements GreeterWorkflow {
private GreeterActivitiesClient operations = new GreeterActivitiesClientImpl();

public void greet() {
Promise<String> name = operations.getName();
Promise<String> greeting = operations.getGreeting(name);
operations.say(greeting);
}
}

托管它们的工作人员。显然,它可以分为单独的 Activity 和工作流程工作人员:

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;
import com.amazonaws.services.simpleworkflow.flow.ActivityWorker;
import com.amazonaws.services.simpleworkflow.flow.WorkflowWorker;

public class GreeterWorker {
public static void main(String[] args) throws Exception {
ClientConfiguration config = new ClientConfiguration().withSocketTimeout(70*1000);

String swfAccessId = System.getenv("AWS_ACCESS_KEY_ID");
String swfSecretKey = System.getenv("AWS_SECRET_KEY");
AWSCredentials awsCredentials = new BasicAWSCredentials(swfAccessId, swfSecretKey);

AmazonSimpleWorkflow service = new AmazonSimpleWorkflowClient(awsCredentials, config);
service.setEndpoint("https://swf.us-east-1.amazonaws.com");

String domain = "helloWorldWalkthrough";
String taskListToPoll = "HelloWorldList";

ActivityWorker aw = new ActivityWorker(service, domain, taskListToPoll);
aw.addActivitiesImplementation(new GreeterActivitiesImpl());
aw.start();

WorkflowWorker wfw = new WorkflowWorker(service, domain, taskListToPoll);
wfw.addWorkflowImplementationType(GreeterWorkflowImpl.class);
wfw.start();
}
}

工作流程启动器:

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;

public class GreeterMain {

public static void main(String[] args) throws Exception {
ClientConfiguration config = new ClientConfiguration().withSocketTimeout(70*1000);

String swfAccessId = System.getenv("AWS_ACCESS_KEY_ID");
String swfSecretKey = System.getenv("AWS_SECRET_KEY");
AWSCredentials awsCredentials = new BasicAWSCredentials(swfAccessId, swfSecretKey);

AmazonSimpleWorkflow service = new AmazonSimpleWorkflowClient(awsCredentials, config);
service.setEndpoint("https://swf.us-east-1.amazonaws.com");

String domain = "helloWorldWalkthrough";

GreeterWorkflowClientExternalFactory factory = new GreeterWorkflowClientExternalFactoryImpl(service, domain);
GreeterWorkflowClientExternal greeter = factory.getClient("someID");
greeter.greet();
}
}

关于java - 如何在 Amazon SWF hello_sample 示例中添加第二个 Activity ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40917325/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com