gpt4 book ai didi

java - 在申请 CompletableFutures 的函数中抛出异常

转载 作者:行者123 更新时间:2023-12-02 10:35:46 24 4
gpt4 key购买 nike

我创建了一些任务,如下所示(这仅用于演示通常的网络调用):

public class RandomTask implements Function<String, String> {
private int number;
private int waitTime;
private boolean throwError;

public RandomTask(int number, int waitTime, boolean throwError) {
this.number = number;
this.waitTime = waitTime;
this.throwError = throwError;
}

@Override
public String apply(String s) {
System.out.println("Job " + number + " started");
try {
Thread.sleep(waitTime);

if (throwError) {
throw new InterruptedException("Something happened");
}

} catch (InterruptedException e) {
System.out.println("Error " + e.getLocalizedMessage());
}

return "RandomTask " + number + " finished";
}
}

然后我有一个 Chain 类,我将每个作业的一些任务链接在一起。

static CompletableFuture<String> start(ExecutorService executorService) {
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Foo", executorService)
.thenApplyAsync(new RandomTask(3, 100, false), executorService)
.thenApplyAsync(new RandomTask(4, 100, false), executorService);

return future2;
}

然后我启动 2 个链,如下所示:

  CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(Chain1.start(fixedThreadPool), Chain2.start(fixedThreadPool));
try {
combinedFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

这样两条链就会同时启动。

现在我想在任务中抛出异常并在调用combinedFuture.get()的地方捕获它,以便我知道我的链中哪个任务失败了。

问题是我无法调整该函数,因为 CompletableFutures 对此有所提示。我尝试过:

@FunctionalInterface
public interface CheckedFunction<T, R> {
R apply(T t) throws InterruptedException;
}

但这行不通。这是不可能的还是我怎样才能实现我的目标?

最佳答案

“这样两条链就会同时启动。”表明您对 CompletableFuture 的理解根本错误有效。

异步操作会在您创建异步操作时或在其先决条件可用时立即提交给执行程序服务。所以在 supplyAsync 的情况下,它没有依赖关系,异步操作就在 supplyAsync 内开始。调用。

All,类似 CompletableFuture.allOf(job1, job2).get() 的结构所做的,是根据两个作业创建一个新阶段并等待其完成,因此最终结果只是等待两个作业的完成。它启 Action 业。他们已经在奔跑了。等待完成对完成过程没有影响。

链接 CompletableFuture使用允许检查异常的自定义函数类型可以完成如下

public static <T,R> CompletableFuture<R> thenApplyAsync(
CompletableFuture<T> f, CheckedFunction<? super T, ? extends R> cf,
Executor e) {

CompletableFuture<R> r = new CompletableFuture<>();
f.whenCompleteAsync((v,t) -> {
try {
if(t != null) r.completeExceptionally(t);
else r.complete(cf.apply(v));
} catch(Throwable t2) {
r.completeExceptionally(t2);
}
}, e);
return r;
}

要使用此方法,而不是对 CompletableFuture 进行链接调用,你必须嵌套它们。例如

static CompletableFuture<String> start(ExecutorService executorService) {
CompletableFuture<String> future2 =
thenApplyAsync(thenApplyAsync(
CompletableFuture.supplyAsync(() -> "Foo", executorService),
new RandomTask(3, 100, false), executorService),
new RandomTask(4, 100, false), executorService);

return future2;
}

给定

public class RandomTask implements CheckedFunction<String, String> {
private int number, waitTime;
private boolean throwError;

public RandomTask(int number, int waitTime, boolean throwError) {
this.number = number;
this.waitTime = waitTime;
this.throwError = throwError;
}

@Override
public String apply(String s) throws InterruptedException {
System.out.println("Job " + number + " started");
Thread.sleep(waitTime);
if (throwError) {
throw new InterruptedException("Something happened in "+number);
}
return "RandomTask " + number + " finished";
}
}

您仍然可以创建两个任务并等待两个任务,例如

CompletableFuture.allOf(Chain1.start(fixedThreadPool), Chain2.start(fixedThreadPool))
.join();

关于java - 在申请 CompletableFutures 的函数中抛出异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53307275/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com