gpt4 book ai didi

java - WebTestClient 返回线程不支持的 IllegalStateException : block()/blockFirst()/blockLast() are blocking,

转载 作者:行者123 更新时间:2023-12-04 14:26:28 29 4
gpt4 key购买 nike

以下功能:

private Boolean canDoIt(Parameter param) {
return myService
.getMyObjectInReactiveWay(param)
.map(myObject -> myService.checkMyObjectInImperativeWay(myObject))
.block();
}
在运行时工作正常,但是在使用 WebTestClient 测试使用它的流时我收到以下错误:
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.4.1.jar:3.4.1]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoFlatMap] :
reactor.core.publisher.Mono.flatMap
我知道我不应该使用 block()但我别无选择:该函数必须返回 Boolean (不是 Mono<Boolean> )。也许有一种不使用 block() 的替代方法来编写它.
有什么方法可以制作 WebTestClient不抛出那个错误?
使用 Reactor Core 版本 3.4.6 .

最佳答案

我验证我的评论。 block()检查调用线程是否与阻塞代码兼容( react 器外部的线程,或特定 react 器调度程序的线程,如 Schedulers.boundedElastic() )。
有两种方法可以处理响应式(Reactive)堆栈中间的阻塞调用:

  • 使用您将阻止的发布者上的共享运算符。请注意,共享运算符会在内部缓存该值。
  • block()使用 scheduleOn 在阻塞兼容调度程序上执行的调用或 publishOn .请注意,不应在直接调用 block() 的发布者上调用此运算符。 ,但在将“包装”块调用的发布者上(参见下面的示例)。

  • 一些引用:
  • The schedulers API doc总结可用的调度程序类型,并说明哪些是为阻塞调用而设计的。
  • Reference guide section about blocking call wrapping
  • share() operator documentation

  • 一个最小的可重现示例(在 v3.4.6 上测试)给出了这个输出:
    Ok context: not running from reactor Threads
    value is true
    Problematic stack: working with scheduler not compatible with blocking call
    ERROR: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-2
    Bad way to subscribe on a blocking compatible scheduler
    ERROR: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-4
    Bad way to publish on blocking compatible scheduler
    ERROR: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-6
    Possible workaround: share the reactive stream before blocking on it
    It worked
    Right way to subscribe on blocking compatible scheduler
    It worked
    Right way to publish on blocking compatible scheduler
    It worked
    代码来了:
    import reactor.core.publisher.Mono;
    import reactor.core.scheduler.Schedulers;

    import java.time.Duration;
    import java.util.concurrent.Callable;
    import java.util.function.Supplier;

    public class BlockingWorkaround {

    public static void main(String[] args) throws Exception {
    System.out.println("Ok context: not running from reactor Threads");
    System.out.println("value is "+blockingFunction());

    System.out.println("Problematic stack: working with scheduler not compatible with blocking call");
    executeAndWait(() -> blockingFunction());

    System.out.println("Bad way to subscribe on a blocking compatible scheduler");
    executeAndWait(() -> blockingFunctionUsingSubscribeOn());

    System.out.println("Bad way to publish on blocking compatible scheduler");
    executeAndWait(() -> blockingFunctionUsingPublishOn());

    System.out.println("Possible workaround: share the reactive stream before blocking on it");
    executeAndWait(() -> blockingFunctionShared());

    System.out.println("Right way to subscribe on blocking compatible scheduler");
    subscribeOnAndWait(() -> blockingFunction());

    System.out.println("Right way to publish on blocking compatible scheduler");
    publishOnAndWait(() -> blockingFunction());
    }

    static Boolean blockingFunction() {
    return delay()
    .flatMap(delay -> Mono.just(true))
    .block();
    }

    static Boolean blockingFunctionShared() {
    return delay()
    .flatMap(delay -> Mono.just(true))
    .share() // Mono result is cached internally
    .block();
    }

    static Boolean blockingFunctionUsingSubscribeOn() {
    return delay()
    .subscribeOn(Schedulers.boundedElastic())
    .flatMap(delay -> Mono.just(true))
    .block();
    }

    static Boolean blockingFunctionUsingPublishOn() {
    return delay()
    .flatMap(delay -> Mono.just(true))
    .publishOn(Schedulers.boundedElastic())
    .block();
    }

    static Mono<Long> delay() {
    return Mono.delay(Duration.ofMillis(10));
    }

    private static void executeAndWait(Supplier<Boolean> blockingAction) throws InterruptedException {
    delay()
    .map(it -> blockingAction.get())
    .subscribe(
    val -> System.out.println("It worked"),
    err -> System.out.println("ERROR: " + err.getMessage())
    );

    Thread.sleep(100);
    }

    private static void subscribeOnAndWait(Callable<Boolean> blockingAction) throws InterruptedException {
    final Mono<Boolean> blockingMono = Mono.fromCallable(blockingAction)
    .subscribeOn(Schedulers.boundedElastic()); // Upstream is executed on given scheduler

    delay()
    .flatMap(it -> blockingMono)
    .subscribe(
    val -> System.out.println("It worked"),
    err -> System.out.println("ERROR: " + err.getMessage())
    );

    Thread.sleep(100);
    }

    private static void publishOnAndWait(Supplier<Boolean> blockingAction) throws InterruptedException {
    delay()
    .publishOn(Schedulers.boundedElastic()) // Cause downstream to be executed on given scheduler
    .map(it -> blockingAction.get())
    .subscribe(
    val -> System.out.println("It worked"),
    err -> System.out.println("ERROR: " + err.getMessage())
    );

    Thread.sleep(100);
    }
    }

    关于java - WebTestClient 返回线程不支持的 IllegalStateException : block()/blockFirst()/blockLast() are blocking,,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67534602/

    29 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com