gpt4 book ai didi

spring-webflux - 如何在订阅中用阻塞操作包装 Flux?

转载 作者:行者123 更新时间:2023-12-02 14:57:55 25 4
gpt4 key购买 nike

文档中写道,您应该将阻塞代码包装到 Mono 中:http://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking

但它并没有写出实际如何去做。

我有以下代码:

@PostMapping(path = "some-path", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> doeSomething(@Valid @RequestBody Flux<Something> something) {
something.subscribe(something -> {
// some blocking operation
});

// how to return Mono<Void> here?
}

我在这里遇到的第一个问题是我需要归还一些东西,但我不能。例如,如果我返回一个 Mono.empty,请求将在通量工作完成之前关闭。

第二个问题是:如何像文档中建议的那样实际包装阻塞代码:

Mono blockingWrapper = Mono.fromCallable(() -> { 
return /* make a remote synchronous call */
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.elastic());

最佳答案

您不应在 Controller 处理程序中调用 subscribe,而应构建响应式(Reactive)管道并返回它。最终,HTTP 客户端将请求数据(通过 Spring WebFlux 引擎),这就是向管道订阅和请求数据的原因。

手动订阅会将请求处理与其他操作分离,这将 1) 取消对操作顺序的任何保证,以及 2) 如果其他操作正在使用 HTTP 资源(例如请求正文),则中断处理。

在这种情况下,源不是阻塞的,只有变换操作是阻塞的。所以我们最好使用 publishOn 来表示链的其余部分应该在特定的调度程序上执行。如果这里的操作是 I/O 绑定(bind)的,那么 Schedulers.elastic() 是最好的选择,如果它是 CPU 绑定(bind)的,那么 Schedulers.paralell 是更好的选择。这是一个例子:

@PostMapping(path = "/some-path", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> doSomething(@Valid @RequestBody Flux<Something> something) {

return something.collectList()
.publishOn(Schedulers.elastic())
.map(things -> {
return processThings(things);
})
.then();
}

public ProcessingResult processThings(List<Something> things) {
//...
}

有关该主题的更多信息,请查看 Scheduler section in the reactor docs .如果您的应用程序倾向于做很多这样的事情,您将失去 react 流的很多好处,您可能会考虑切换到基于 Servlet 的模型,在该模型中您可以相应地配置线程池。

关于spring-webflux - 如何在订阅中用阻塞操作包装 Flux?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52071249/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com