gpt4 book ai didi

Java CompletableFuture.complete() block

转载 作者:搜寻专家 更新时间:2023-11-01 02:38:26 24 4
gpt4 key购买 nike

我在 java 中使用 CompletableFuture 时遇到问题。我有 2 个选择请求,这些请求在收到来自服务器的响应时被填充。

在连接线程(THREAD-1)(使用reactor)中,我使用:

if(hasException) {
selectFuture.completeExceptionally(new ClientException(errorCode));
} else {
System.out.println("Before complete future");
selectFuture.complete(result);
System.out.println("After complete future");
}

在其他线程 (THREAD-2) 中,我使用:

CompleteFuture.allOf(allSelect).whenComplete((aVoid, throwable) -> {
System.out.println("Receive all future");
// Do sth here
});

我的情况是,系统打印出“Receive all future”,但调用 future.complete(result); 时 THREAD-1 被阻塞,它无法退出该命令。如果在 THREAD-2 中,我使用 CompletableFuture.allOf(allOfSelect).get(),THREAD-1 将正确运行。但是使用 CompletableFuture.get() 会降低性能,所以我想使用 CompletableFuture.whenComplete()

谁能帮我解释一下阻塞的原因?

谢谢!

最佳答案

complete 调用触发所有相关的 CompletionStage

因此,如果您之前使用 whenComplete 注册了一个 BiConsumercomplete 将在其调用线程中调用它。在您的情况下,当您传递给 whenCompleteBiConsumer 完成时,对 complete 的调用将返回。这在 class javadoc 中有描述。

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

(另一个调用者是相反的情况,调用whenComplete的线程实际上会应用BiConsumer if 目标 CompletableFuture 已经完成。)

这里有一个小程序来说明这个行为:

public static void main(String[] args) throws Exception {
CompletableFuture<String> future = new CompletableFuture<String>();
future.whenComplete((r, t) -> {
System.out.println("before sleep, executed in thread " + Thread.currentThread());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after sleep, executed in thread " + Thread.currentThread());
});

System.out.println(Thread.currentThread());
future.complete("completed");
System.out.println("done");
}

这将打印

Thread[main,5,main]
before sleep, executed in thread Thread[main,5,main]
after sleep, executed in thread Thread[main,5,main]
done

显示 BiConsumer 被应用到主线程中,即调用 complete 的线程。

您可以使用 whenCompleteAsync强制在单独的线程中执行 BiConsumer

[...] that executes the given action using this stage's default asynchronous execution facility when this stage completes.

例如,

public static void main(String[] args) throws Exception {
CompletableFuture<String> future = new CompletableFuture<String>();
CompletableFuture<?> done = future.whenCompleteAsync((r, t) -> {
System.out.println("before sleep, executed in thread " + Thread.currentThread());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after sleep, executed in thread " + Thread.currentThread());
});

System.out.println(Thread.currentThread());
future.complete("completed");
System.out.println("done");
done.get();
}

将打印

Thread[main,5,main]
done
before sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
after sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]

表明 BiConsumer 是在一个单独的线程中应用的。

关于Java CompletableFuture.complete() block ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40393489/

24 4 0