gpt4 book ai didi

spring-boot - 如何使用 netty-reactor 从阻塞调度程序切换回以前的调度程序?

转载 作者:行者123 更新时间:2023-12-05 03:31:56 32 4
gpt4 key购买 nike

如何使用 Spring Webflux + Netty + Reactor 从阻塞调度器(blocking-pool)切换回之前的调度器(reactor-http-nio)?

代码:

@RequiredArgsConstructor
@Service
@Slf4j
public class BookService {

private final IBookRepo bookRepo;

private final BlockingPoolConfig blockingPoolConfig;

public Mono<Optional<Book>> getBook(Long id) {
log.debug("getBook() - id: {}", id);
return asyncCallable(() -> {
log.trace("getBook() - invoking bookRepo.findById(id) ...");
return bookRepo.findById(id);
});
}

protected <S> Mono<S> asyncCallable(Callable<S> callable) {
return Mono.fromCallable(callable)
.subscribeOn(blockingPoolConfig.blockingScheduler());
}
}

@RestController
@RequiredArgsConstructor
@Slf4j
public class BookController {

private final BookService bookService;

@GetMapping("/book/{id}")
public Mono<Book> get(@PathVariable Long id) {
log.debug("get() - id: {}", id);
return bookService.getBook(id)
.publishOn(Schedulers.parallel()) //publishOn(... ?)
.map(optionalBook -> {
return optionalBook.map(book -> {
log.debug("get() result: {}", book);
return book;
}).orElseThrow(() -> {
log.debug("book with id: {} is not found.", id);
return new ResponseStatusException(HttpStatus.NOT_FOUND, "Book not found");
});
});
}

@Configuration
@Slf4j
public class BlockingPoolConfig {

@Value("${spring.datasource.maximumPoolSize:8}")
private int connectionPoolSize = 1;

@Scope("singleton")
@Bean
public Scheduler blockingScheduler() {
Scheduler scheduler = Schedulers.newBoundedElastic(connectionPoolSize, connectionPoolSize, "blocking-pool");
return scheduler;
}
}

上面我使用的是 publishOn(Schedulers.parallel()),但是这个创建了新的线程池(并行)。而不是这个,我更喜欢切换 reactor-http-nio 线程池。

实际结果日志:

19:17:45.290 [reactor-http-nio-2       ] DEBUG t.a.p.controller.BookController    - get() - id: 1
19:17:45.291 [reactor-http-nio-2 ] DEBUG t.a.p.service.BookService - getBook() - id: 1
19:17:45.316 [blocking-pool-1 ] TRACE t.a.p.service.BookService - getBook() - invoking bookRepo.findById(id) ...
19:17:45.427 [parallel-2 ] DEBUG t.a.p.controller.BookController - get() result: Book(id=1, title=Abc)

预期结果日志:

19:17:45.290 [reactor-http-nio-2       ] DEBUG t.a.p.controller.BookController    - get() - id: 1
19:17:45.291 [reactor-http-nio-2 ] DEBUG t.a.p.service.BookService - getBook() - id: 1
19:17:45.316 [blocking-pool-1 ] TRACE t.a.p.service.BookService - getBook() - invoking bookRepo.findById(id) ...
19:17:45.427 [reactor-http-nio-2 ] DEBUG t.a.p.controller.BookController - get() result: Book(id=1, title=Abc)

最佳答案

目前这是不可能的,因为 A) 这些 HTTP 线程不是由 Reactor Scheduler 控制的,而是由底层的 Netty 事件循环本身控制的,并且 B) 在 Java 中没有通用的方法来“将执行返回给(任意)线程”,如果该线程没有与之关联的 Executor/ExecutorService

对于 reactor-netty,一旦您退出了 HTTP 线程,就没有理由再切换回 Netty 线程了。发送响应后,reactor-netty 将自然完成。

假设阻塞池类似于 Schedulers.boundedElastic(),您可能确实想转到 Schedulers.parallel() 来限制阻塞线程的生命周期,这是一个完美的解决方案。

关于spring-boot - 如何使用 netty-reactor 从阻塞调度程序切换回以前的调度程序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70556969/

32 4 0