gpt4 book ai didi

java - Flux 未在 Spring 5 react 器中订阅

转载 作者:搜寻专家 更新时间:2023-11-01 01:31:18 25 4
gpt4 key购买 nike

我可能遗漏了什么,但我不知道是什么。

下面的代码什么都不做:

webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.subscribe();

如果我尝试阻止调用,它会正常工作:

webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.block();

奇怪的是,如果我“手动”创建一个 Flux(即不是来自 spring webClient),这工作正常:

Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.doOnNext(System.out::println)
.subscribe();

有人可以解释一下我做错了什么吗? .subscribe() 不是应该在第一种情况下执行操作,就像在最后一种情况下一样吗?

谢谢!

最佳答案

简答题

subscribe 不会阻塞当前线程,这意味着应用程序主线程可以在 Flux 发出任何元素之前完成。因此,要么使用 block,要么在主线程中等待。

详情

调用无参数 subscribe()只是在 Flux 上发出 request(unbounded) 而无需设置任何 Subscriber。它通常在单独的线程 中触发操作,但不会阻塞当前线程。最有可能的是,您的主线程在 WebClient 收到该单独线程中的响应和 passive side effect doOnNext(...) 之前结束。发生了。

为了说明/测试操作已启动,请在主线程中等待一段时间。只需在 subscribe() 调用之后添加以下行:

Thread.sleep(1000);

现在,在设置超时值后,您将能够看到打印的结果。

现在让我们为异步操作隐式发送自定义 Scheduler 并等待其所有任务完成。此外,让我们将 System.out::println 作为 subscribe(...) 参数而不是 doOnNext 传递,以便完整代码显示为如下:

ExecutorService executor = Executors.newSingleThreadExecutor(); 

webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.publishOn(Schedulers.fromExecutor(executor)) // next operation will go to this executor
.subscribe(System.out::println); //still non-blocking

executor.awaitTermination(1, TimeUnit.SECONDS); //block current main thread

此示例使用略有不同的 subscribe(Consumer) .最重要的是,它添加了 publishOn(Scheduler)它由 ExecutorService 支持。后者用于等待主线程中的终止。

当然,实现相同结果的更简单方法是使用您最初提到的 block():

webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.block();

最后,请注意您的第三个示例 Flux.just(...)...subscribe() - 似乎它只是在您的主线程终止之前快速完成。这是因为与发射单个 GetLocationsResponse 元素相比,发射几个 String 元素所需的时间更少(暗示写入请求+读取响应+解析到 POJO 的时间) .但是,如果您使此 Flux 延迟元素,您将重现相同的行为:

Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500)) //this makes it stop printing in main thread
.doOnNext(System.out::println)
.subscribe();


Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500))
.doOnNext(System.out::println)
.blockLast(); //and that makes it printing back again

关于java - Flux 未在 Spring 5 react 器中订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52375697/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com