gpt4 book ai didi

java - CompletableFuture 异常处理 runAsync & thenRun

转载 作者:行者123 更新时间:2023-11-30 01:41:09 28 4
gpt4 key购买 nike

假设我有这个示例代码,并且在 runAsync 中遇到异常。我的问题是这个异常是否会阻止 thenRun 被执行,因为 thenRun 与此代码的调用者方法在同一线程中运行。

private void caller() {
CompletableFuture.runAsync(() -> {
try {
// some code
} catch (Exception e) {
throw new CustomException(errorMessage, e);
}
}, anInstanceOfTaskExecutor).thenRun(
// thenRun code
));
}

我已经完成了 this 线程,它解释了如何处理从异步块抛出的异常(即通过阻塞和使用 join )。我想知道 thenRun 块中的代码是否会被执行,如果 CompletableFuture completesExceptionally

更新 :

我运行了一些代码来测试这个:
CompletableFuture.runAsync(() -> {
List<Integer> integerList = new ArrayList<>();
integerList.get(1); // throws exception
}).thenRun(() -> {
System.out.println("No exception occurred");
});

它不打印任何内容,这意味着异常不会从异步块“传播到/到达”调用方方法的线程。我现在了解这里的预期行为,但我有以下问题:
  • 为什么即使 CompletableFuture 异常完成,它仍然默默地失败?
  • 在后台是如何工作的?
  • 是不是因为这两个线程(调用者的线程和异步线程)都有自己的堆栈空间?
  • 最佳答案

    一般信息

    CompletionStage 的文档解释了接口(interface)的一般规则:

    A stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage completes. A stage completes upon termination of its computation, but this may in turn trigger other dependent stages. The functionality defined in this interface takes only a few basic forms, which expand out to a larger set of methods to capture a range of usage styles:

    • The computation performed by a stage may be expressed as a Function, Consumer, or Runnable (using methods with names including apply, accept, or run, respectively) depending on whether it requires arguments and/or produces results. For example:

      stage.thenApply(x -> square(x))
      .thenAccept(x -> System.out.print(x))
      .thenRun(() -> System.out.println());

      An additional form (compose) allows the construction of computation pipelines from functions returning completion stages.

      Any argument to a stage's computation is the outcome of a triggering stage's computation.

    • One stage's execution may be triggered by completion of a single stage, or both of two stages, or either of two stages. Dependencies on a single stage are arranged using methods with prefix then. Those triggered by completion of both of two stages may combine their results or effects, using correspondingly named methods. Those triggered by either of two stages make no guarantees about which of the results or effects are used for the dependent stage's computation.

    • Dependencies among stages control the triggering of computations, but do not otherwise guarantee any particular ordering. Additionally, execution of a new stage's computations may be arranged in any of three ways: default execution, default asynchronous execution (using methods with suffix async that employ the stage's default asynchronous execution facility), or custom (via a supplied Executor). The execution properties of default and async modes are specified by CompletionStage implementations, not this interface. Methods with explicit Executor arguments may have arbitrary execution properties, and might not even support concurrent execution, but are arranged for processing in a way that accommodates asynchrony.

    • Two method forms (handle and whenComplete) support unconditional computation whether the triggering stage completed normally or exceptionally. Method exceptionally supports computation only when the triggering stage completes exceptionally, computing a replacement result, similarly to the java [sic] catch keyword. In all other cases, if a stage's computation terminates abruptly with an (unchecked) exception or error, then all dependent stages requiring its completion complete exceptionally as well, with a CompletionException holding the exception as its cause. If a stage is dependent on both of two stages, and both complete exceptionally, then the CompletionException may correspond to either one of these exceptions. If a stage is dependent on either of two others, and only one of them completes exceptionally, no guarantees are made about whether the dependent stage completes normally or exceptionally. In the case of method whenComplete, when the supplied action itself encounters an exception, then the stage completes exceptionally with this exception unless the source stage also completed exceptionally, in which case the exceptional completion from the source stage is given preference and propagated to the dependent stage.

    All methods adhere to the above triggering, execution, and exceptional completion specifications (which are not repeated in individual method specifications). [...]

    [...]



    CompletableFuture 的文档解释了线程规则(和其他策略),如上所述,其中一些由 CompletionStage 的实现决定:

    A Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion.

    When two or more threads attempt to complete, completeExceptionally, or cancel a CompletableFuture, only one of them succeeds.

    In addition to these and related methods for directly manipulating status and results, CompletableFuture implements interface CompletionStage with the following policies:

    • Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

    • All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task). This may be overridden for non-static methods in subclasses by defining method defaultExecutor(). To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interface CompletableFuture.AsynchronousCompletionTask. Operations with time-delays can use adapter methods defined in this class, for example: supplyAsync(supplier, delayedExecutor(timeout, timeUnit)). To support methods with delays and timeouts, this class maintains at most one daemon thread for triggering and cancelling actions, not for running them.

    • All CompletionStage methods are implemented independently of other public methods, so the behavior of one method is not impacted by overrides of others in subclasses.

    • All CompletionStage methods return CompletableFutures. To restrict usages to only those methods defined in interface CompletionStage, use method minimalCompletionStage(). Or to ensure only that clients do not themselves modify a future, use method copy().

    CompletableFuture also implements Future with the following policies:

    • Since (unlike FutureTask) this class has no direct control over the computation that causes it to be completed, cancellation is treated as just another form of exceptional completion. Method cancel has the same effect as completeExceptionally(new CancellationException()). Method isCompletedExceptionally() can be used to determine if a CompletableFuture completed in any exceptional fashion.

    • In case of exceptional completion with a CompletionException, methods get() and get(long, TimeUnit) throw an ExecutionException with the same cause as held in the corresponding CompletionException. To simplify usage in most contexts, this class also defines methods join() and getNow(T) that instead throw the CompletionException directly in these cases.

    [...]



    你的问题

    这是您的示例代码:

    CompletableFuture.runAsync(() -> {
    List<Integer> integerList = new ArrayList<>();
    integerList.get(1); // throws exception
    }).thenRun(() -> {
    System.out.println("No exception occurred");
    });

    如果您不知道,诸如 thenRun 之类的方法会返回一个新的 CompletionStage 。所以你的代码类似于以下内容:

    CompletableFuture<Void> runAsyncStage = CompletableFuture.runAsync(() -> List.of().get(0));
    CompletableFuture<Void> thenRunStage =
    runAsyncStage.thenRun(() -> System.out.println("thenRun executing!"));
    thenRunStagerunAsyncStage 的完成触发,在这种情况下,保证以 IndexOutOfBoundsException 异常完成。至于为什么不执行 Runnable ,那是由于 CompletionStage#thenRun(Runnable) 的契约:

    Returns a new CompletionStage that, when this stage completes normally, executes the given action. See the CompletionStage documentation for rules covering exceptional completion.



    由于触发阶段异常完成, thenRunStage阶段也异常完成,即跳过 Runnable

    1. “为什么即使 CompletableFuture 异常完成,它仍然默默地失败?”

    示例代码相当于用 try-catch 块吞下异常。您没有看到异常,因为您还没有编写报告异常的代码。 runAsyncStagethenRunStage 阶段都异常完成,后者是因为前者异常完成。

    如果您想了解阶段“链内”的异常,那么您必须使用阶段,例如 exceptionally[Async]handle[Async]whenComplete[Async] 。通过这种方式,您可以根据触发阶段的正常或异常完成来更改链的行为。

    如果您想知道阶段的“链外”异常,那么您必须使用诸如 join()get()get(long,TimeUnit) 之类的方法。如果该阶段异常完成,则第一个将抛出一个 CompletionException 包装失败的原因,而后两个将抛出一个 ExecutionException 包装失败的原因。

    2.“它在后台是如何工作的?”
    CompletableFuture 的实现太复杂,无法在 Stack Overflow 答案中解释。如果你想研究实现,你可以查看源代码。您的 JDK 应该带有一个包含 Java 源文件的 src.zip 文件。您也可以在 OpenJDK repositories 中在线查看源代码。例如,这是 CompletableFuture 的 JDK 13 源代码:

    https://hg.openjdk.java.net/jdk/jdk13/file/0368f3a073a9/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java

    3.“是不是因为这两个线程(调用者的线程和异步线程)都有自己的堆栈空间?”

    除非两个线程之间存在某种通信,否则一个线程不会意识到另一个线程中的异常。诸如 join() 之类的调用方法将在适当的时候将异常传达给将抛出所述异常的调用线程。但是,正如您对第一个问题的回答所示,它比这稍微复杂一些。即使线程在单个阶段抛出异常,您也不会看到堆栈跟踪或任何类似内容。这是因为异常被捕获并且阶段被标记为失败,并以该异常作为原因。然后,其他代码必须根据需要显式检索和处理该异常。

    这与使用 ExecutorService 并返回 Future 对象没有什么不同。任务可能会在后台失败,但其他代码在查询 Future 之前不会意识到这一点。

    来自赏金:“我希望了解线程如何相互交互的细节。”

    我不知道还有什么要补充的。 CompletionStage API 是一个“高于”线程的抽象。您只需告诉 API 您希望命令链如何执行,包括每个阶段使用哪些线程池,并且实现会为您处理所有线程间通信。也就是说,每个线程都做自己的事情,只是 API 旨在提供一种更简单和 react 性的方式来在线程之间进行通信。如果您对它的实现方式感兴趣,那么我建议您研究源代码(上面链接)。

    关于java - CompletableFuture 异常处理 runAsync & thenRun,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59941401/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com