gpt4 book ai didi

java - 工作流设计模式与任务模式相结合?

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:51:23 24 4
gpt4 key购买 nike

我目前正在开发一个执行长期非线性任务的企业应用程序。

工作流的抽象:

  1. 收集必要的信息(可能需要几分钟,但并非总是必要)
  2. 处理数据(总是需要很长时间)
  3. 通知几个对结果进行后处理的工作人员(在新任务中)

现在,我已经创建了 2 个服务,可以解决第 1 步和第 2 步。由于服务不应该相互了解,我想要一个更高阶的组件来协调任务的 3 个步骤。将其视为一个 Callable,它将任务发送到服务一,当服务 1 返回结果时再次唤醒,将其发送到服务 2,...,将最终结果发送到所有后处理器并结束任务。但由于它可能有 100'000 个排队任务,我不想用任务控制可调用项启动 100'000 个线程,即使像 99.9% 的时间一样空闲仍然是一个巨大的开销。

那么有人知道如何控制封装在任务对象中的生产者消费者队列式模式,或者有人知道一个框架可以简化我的关注点吗?

最佳答案

除了 actor 框架之外,我还建议使用普通旧 Java 的两种主要方法:

  • 使用我们向其提交任务的 ExecutorService。可以使用 Future 同步正确的步骤顺序对象。可以使用 Phaser 同步整个任务集a 如下所示。

  • 使用 Fork/Join framework

这是一个使用简单执行器服务的示例。 Workflow 类被赋予一个执行器和一个移相器(同步屏障)。每次执行工作流时,它都会为每个步骤(即数据收集、处理和后处理)提交一个新任务。每个任务都使用这些移相器来指示它何时开始和停止。

public class Workflow {

private final ExecutorService executor;
private final Phaser phaser;

public Workflow(ExecutorService executor, Phaser phaser) {
this.executor = executor;
this.phaser = phaser;
}

public void execute(int request) throws InterruptedException, ExecutionException {
executor.submit(() -> {
phaser.register();
// Data collection
Future<Integer> input = executor.submit(() -> {
phaser.register();
System.out.println("Gathering data for call " + request);
phaser.arrive();
return request;
});
// Data Processing
Future<Integer> result = executor.submit(() -> {
phaser.register();
System.out.println("Processing call " + request);
Thread.sleep(5000);
phaser.arrive();
return request;
});
// Post processing
Future<Integer> ack = executor.submit(() -> {
phaser.register();
System.out.println("Notyfing processors for call " + request);
phaser.arrive();
return request;
});
final Integer output = ack.get();
phaser.arrive();
return output;
});
}

}

在关闭执行器之前,调用者对象使用移相器对象来了解所有子任务(步骤)何时完成。

public static void main(String[] args) throws InterruptedException, ExecutionException {
final Phaser phaser = new Phaser();
final ExecutorService executor = Executors.newCachedThreadPool();
Workflow workflow = new Workflow(executor, phaser);

phaser.register();
for (int request=0 ; request<10 ; request++) {
workflow.execute(request);
}

phaser.arriveAndAwaitAdvance();
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
}

关于java - 工作流设计模式与任务模式相结合?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41880425/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com