- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在考虑数据流语言的想法,并尝试使用执行程序服务将可用线程的数量限制为系统上的核心数量,并创建一个共享线程池。
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Function a = new Function("A", service);
Function b = new Function("B", service);
Function c = new Function("C", service);
a.addObserver(b);
a.addObserver(c);
a.change()
A 发生更改,导致它调用它的 evaluate()
,然后 B 和 C 被通知需要更新,并且他们会这样做。
在Function.java
中我有:
public class Function implements Func{
private boolean isComplete;
private Set<Func> dependsOn;
private Set<Func> observedBy;
private ExecutorService service;
public Function(String name, ExecutorService service){
observedBy = new HashSet<>();
dependsOn = new HashSet<>();
this.isComplete = false;
this.service = service;
}
public void addObserver(Func f){
observedBy.add(f);
f.addDependency(this);
}
private void evaluate() {
boolean match = dependsOn.stream().allMatch(Func::isComplete);
if (match) {
this.isComplete = false;
// inform each observer that I'm about to start to evaluate
observedBy.forEach(func -> func.onDependentEvaluate(this));
CompletableFuture.runAsync(() -> {
try {
System.out.println("Doing some work");
int sleepTime = (int) (Math.random() * 5000);
System.out.println("Working for " + sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, service).thenRun(this::onComplete);
}
}
public void addDependency(Func f){
dependsOn.add(f);
}
@Override
public void change() {;
evaluate();
}
@Override
public void onComplete() {
this.isComplete = true;
observedBy.forEach((f) -> f.onDependentComplete(this));
}
@Override
public void onDependentEvaluate(Func f) {
}
@Override
public void onDependentComplete(Func func){
evaluate();
}
@Override
public boolean isComplete() {
return this.isComplete;
}
}
当我将 service
传递给 runAsync
时,只有 A 执行。如果我不传递 service
并且 CompletableFuture 使用 ForkJoin.commonPool,则该图将按我的预期执行。是否有其他方法来共享 ExecutorService ?知道为什么下一个 Future 没有被执行吗?
最佳答案
你的问题比你想象的要简单得多。 tl;dr 您唯一的问题是您没有关闭 ExecutorService
,因此您的程序不会终止。
长版本:在我的环境中,该程序按预期工作,并且任务 B
和 C
实际上运行。我重构了您的代码,以允许我创建一个 AbstractFunction
,允许我覆盖 Function
实际执行的行为,例如
public abstract class AbstractFunction implements Func {
protected boolean isComplete;
protected Set<Func> dependsOn;
protected Set<Func> observedBy;
protected ExecutorService service;
protected String name;
public AbstractFunction(String name, ExecutorService service) {
observedBy = new HashSet<>();
dependsOn = new HashSet<>();
this.isComplete = false;
this.service = service;
this.name = name;
}
public void addObserver(Func f) {
observedBy.add(f);
f.addDependency(this);
}
public void addDependency(Func f) {
dependsOn.add(f);
}
@Override
public void onComplete() {
this.isComplete = true;
observedBy.forEach((f) -> f.onDependentComplete(this));
}
@Override
public boolean isComplete() {
return this.isComplete;
}
@Override
public void change() { }
@Override
public void onDependentEvaluate(Func f) { }
@Override
public void onDependentComplete(Func func) { }
}
因此,您原来的(实际上未更改)Function
现在是这样的:
public class Function extends AbstractFunction implements Func {
public Function(String name, ExecutorService service) {
super(name, service);
}
protected void evaluate() {
boolean match = dependsOn.stream().allMatch(Func::isComplete);
if (match) {
this.isComplete = false;
// inform each observer that I'm about to start to evaluate
observedBy.forEach(func -> func.onDependentEvaluate(this));
CompletableFuture.runAsync(() -> {
try {
System.out.println("Doing some work");
int sleepTime = (int) (Math.random() * 5000);
System.out.println("Working for " + sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
System.out.println("I was interrupted, my name is: " + name);
e.printStackTrace();
}
}, service).thenRun(this::onComplete);
}
}
@Override
public void change() {
evaluate();
}
@Override
public void onDependentComplete(Func f) {
evaluate();
}
}
鉴于这个新结构,我创建了一个 ShutdownFunc
来监听并在 "C"
完成时关闭执行程序服务,即
public class ShutdownFunc extends AbstractFunction {
public ShutdownFunc(String name, ExecutorService service) {
super(name, service);
}
@Override
public void onDependentComplete(Func f) {
System.out.println("Shutting down the system");
try {
service.shutdown();
service.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
当然还有主要内容:
public class Main {
public static void main(String... args) {
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime()
.availableProcessors());
Function a = new Function("A", service);
Function b = new Function("B", service);
Function c = new Function("C", service);
a.addObserver(b);
a.addObserver(c);
Func d = new ShutdownFunc("D", service);
c.addObserver(d);
a.change();
}
}
输出:
Doing some work
Working for 4315
Doing some work
Working for 2074
Doing some work
Working for 2884
Shutting down the system
程序正确地等待完成,然后正常终止。
关于java - 为什么这个 CompletableFuture 不使用共享的 ExecutorService 执行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31973629/
我的应用程序中有 3 种不同的方法。全部返回CompletableFuture .我想并行执行方法 1 和 2。完成方法 1 和 2 后,我想使用方法 1 和 2 返回值的参数触发方法 3。 代码示例
我正在尝试加快对多个 API 的调用。 在下面的代码中,getFilteredEvents是当前同步版本。我有这样的感觉 map(x -> x.getFilteredEvents(eventResea
Q1。我的理解是,如果 future 正常或异常完成,就会调用 completableFuture.handle 。但是超时情况又如何呢? 第二季度。在哪里检查 completableFuture 的
我不明白这里发生了什么 CompletableFuture/supplyAsync。 如果我从先前实例化的 CompletableFuture 对象调用 supplyAsync 方法,它永远不会完成:
我有一个返回 CompletableFuture 的异步方法。 private CompletableFuture asyncA(..) 我公开了一个必须返回 CompleteableFuture 的
我在项目中找到了这段代码: int threadCount = 10; CompletableFuture[] futures = new CompletableFuture[threadCount]
让我们举个例子:我们有四种方法: CompletableFututre loadAndApply(SomeObject someObject); CompletableFuture loadData(
我有一个可完成的 future (future1),它创建了 10 个可完成的 future (futureN)。只有当所有 futureN 都完成时,有没有办法将 future1 设置为完成? 最佳
我想编写一个返回 CompletableFuture 的异步方法. future 的唯一目的是跟踪方法何时完成,而不是其结果。返回CompletableFuture会更好吗?或 Completable
我正在对我的数据库进行多次异步调用。我将所有这些异步调用存储在 List> list 上.我想一起收集所有结果,所以我需要等待所有这些调用完成。 一种方法是创建一个 CompletableFuture
我正在尝试使用 CompletableFuture 链接一些文件处理程序,它应该返回 CompletableFuture : CompletableFuture allGen = loadFile1(
我有 2 个 CompletableFuture。 task2 只能在 task1 完成后启动。然后,我需要等待所有任务完成。在下面的代码中,程序在 task1 结束后结束。 task2 开始但未完成
我有 2 个 CompletableFuture。 task2 只能在 task1 完成后启动。然后,我需要等待所有任务完成。在下面的代码中,程序在 task1 结束后结束。 task2 开始但未完成
我很难弄清楚这一点,并且可以向那些比我更有经验和知识的人寻求帮助。 基本问题是我需要获取对象列表,然后对于返回的每个对象,获取一些详细信息,并将详细信息缝合到对象中。我希望在这方面保持高效;我需要首先
我目前正在使用 CompletableFuture supplyAsync() 方法将一些任务提交到公共(public)线程池。这是代码片段的样子: final List>> completableF
为什么是CompletableFuture.allOf声明为 CompletableFuture而不是返回结果集合或其他东西?我认为制作 CompletableFuture.anyOf 是个好主意返回
我没有看到处理具有异步结果的异常的明显方法。 例如,如果我想重试一个异步操作,我会期待这样的事情: CompletionStage cf = askPong("cause error").handle
比如我有这样的方法: public CompletableFuture getPage(int i) { ... } public CompletableFuture getDocument(
我正在调用一个返回 CompletableFuture 的服务。 输出结构如下。 Class Output { public String name; public Integer a
我正在尝试转换 List>至CompletableFuture> .当您有许多异步任务并且需要获取所有这些任务的结果时,这非常有用。 如果其中任何一个失败,那么最终的 future 将失败。这就是我的
我是一名优秀的程序员,十分优秀!