gpt4 book ai didi

java - Mono.toFuture() 是阻塞的吗?

转载 作者:行者123 更新时间:2023-11-30 01:44:49 27 4
gpt4 key购买 nike

来自Official Documentation of Mono#block()据说:

Subscribe to this Mono and block indefinitely until a next signal is received. Returns that value, or null if the Mono completes empty. In case the Mono errors, the original exception is thrown (wrapped in a RuntimeException if it was a checked exception).

因此可以肯定 block() 方法是阻塞的,并且在 block() 解析之前它不会执行下一行。

但我的困惑是,当我使用 toFuture() 时,我期望它是非阻塞的,但它的行为与 block 方法完全相同。并在 Documentation of Mono#toFuture()据称:

Transform this Mono into a CompletableFuture completing on onNext or onComplete and failing on onError.

Mono#toFuture()

不太清楚。本文档中没有任何地方说 Mono#toFuture() 正在阻塞

  1. 请确认 toFuture() 方法是阻塞还是非阻塞?
  2. 另外,如果是非阻塞,那么哪个线程将负责执行CompletableFuture内的代码?

更新:添加代码片段

使用Mono.block()方法:

    long time = System.currentTimeMillis();
String block = Mono.fromCallable(() -> {
logger.debug("inside in fromCallable() block()");
//Upstream httpcall with apache httpClient().
// which takes atleast 1sec to complete.
return "Http response as string";
}).block();
logger.info("total time needed {}", (System.currentTimeMillis()-time));

return CompletableFuture.completedFuture(block);
<小时/>

使用Mono.ToFuture()方法:

    long time = System.currentTimeMillis();
CompletableFuture<String> toFuture = Mono.fromCallable(() -> {
logger.debug("inside in fromCallable() block()");
//Upstream httpcall with apache httpClient().
// which takes atleast 1sec to complete.
return "Http response as string";
}).toFuture();
logger.info("total time needed {}", (System.currentTimeMillis()-time));
return toFuture;

这两个代码片段的行为完全相同。

最佳答案

--编辑:我错了。 mono.toFuture() 不会阻塞 --

mono.toFuture() 不会阻塞。看看这个测试:

    @Test
void testMonoToFuture() throws ExecutionException, InterruptedException {
System.out.println(LocalTime.now() + ": start");
Mono<String> mono = Mono.just("hello StackOverflow")
.delayElement(Duration.ofMillis(500))
.doOnNext((s) -> System.out.println(LocalTime.now() + ": mono completed"));
Future<String> future = mono.toFuture();
System.out.println(LocalTime.now() + ": future created");
String result = future.get();
System.out.println(LocalTime.now() + ": future completed");
assertThat(result).isEqualTo("hello StackOverflow");
}

这是结果:

20:18:49.557: start
20:18:49.575: future created
20:18:50.088: mono completed
20:18:50.088: future completed

future 几乎立刻就被创造出来了。半秒后,单声道完成,紧接着, future 完成。这正是我所期望发生的事情。

那么为什么单声道在问题中提供的示例中看起来会阻塞?这是因为 mono.fromCallable() 的工作方式。该可调用实际运行的时间和地点? mono.fromCallable() 不会产生额外的线程来完成这项工作。从我的测试来看,当您第一次在单声道上调用 subscribe() 或 block() 或类似的东西时,可调用对象似乎会运行,并且它将在执行该操作的线程中运行。

这是一个测试,表明如果您使用 fromCallable() 创建一个 mono,subscribe 将导致可调用对象在主线程中执行,甚至 subscribe() 方法也会显得阻塞。

    @Test
void testMonoToFuture() throws ExecutionException, InterruptedException {
System.out.println(LocalTime.now() + ": start");
System.out.println("main thread: " + Thread.currentThread().getName());
Mono<String> mono = Mono.fromCallable(() -> {
System.out.println("callabel running in thread: " + Thread.currentThread().getName());
Thread.sleep(1000);
return "Hello StackOverflow";
})
.doOnNext((s) -> System.out.println(LocalTime.now() + ": mono completed"));
System.out.println("before subscribe");
mono.subscribe(System.out::println);
System.out.println(LocalTime.now() + ": after subscribe");
}

结果:

20:53:37.071: start
main thread: main
before subscribe
callabel running in thread: main
20:53:38.099: mono completed
Hello StackOverflow
20:53:38.100: after subscribe

结论:mono.toFuture() 并不比 mono.subscribe() 更具阻塞性。如果您想异步执行某些代码,则不应使用 Mono.fromCallable()。您可以考虑使用 Executors.newSingleThreadExecutor().submit(someCallable)

作为引用,这是我最初的(错误的)答案,我贬低了 mono.block() 方法,该方法肯定是由比我更了解 Java 和编码的人编写的。我想这是关于谦逊的个人教训。

下面的一切都是废话

我想验证它到底是如何工作的,所以我编写了一些测试。不幸的是,事实证明 mono.toFuture() 确实是阻塞的,并且结果是同步计算的。老实说,我不知道你为什么会使用这个功能。 Future 的全部意义在于保存异步计算的结果。

这是我的测试:

@Test
void testMonoToFuture() throws ExecutionException, InterruptedException {
Mono<Integer> mono = Mono.fromCallable(() -> {
System.out.println("start mono");
Thread.sleep(1000);
System.out.println("mono completed");
return 0;
});
Future<Integer> future = mono.toFuture();
System.out.println("future created");
future.get();
System.out.println("future completed");
}

结果:

start mono
mono completed
future created
future completed

这是 monoToFuture() 的实现,其工作方式与我期望的方式相同:

@Test
void testMonoToFuture() throws ExecutionException, InterruptedException {
Mono<Integer> mono = Mono.fromCallable(() -> {
System.out.println("start mono");
Thread.sleep(1000);
System.out.println("mono completed");
return 0;
});
Future<Integer> future = monoToFuture(mono, Executors.newSingleThreadExecutor());
System.out.println("future created");
future.get();
System.out.println("future completed");
}

private <T> Future<T> monoToFuture(Mono<T> mono, ExecutorService executorService){
return executorService.submit((Callable<T>) mono::block);
}

结果:

future created
start mono
mono completed
future completed

关于java - Mono.toFuture() 是阻塞的吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58504527/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com