gpt4 book ai didi

project-reactor - Project Reactor + flatMap + Multiple onErrorComplete - 未按预期工作

转载 作者:行者123 更新时间:2023-12-04 09:43:53 27 4
gpt4 key购买 nike

当多个 onErrorContinue添加到管道 处理从 flatMap 抛出的特定类型的异常 ,异常处理没有按预期工作。

我希望下面的代码应该删除元素 1 到 6,而订阅者应该使用元素 7 到 10。

public class FlatMapOnErrorContinueExample {
public static void main(String[] args) {
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.flatMap(number -> {
if (number <= 3) {
return Mono.error(new NumberLesserThanThree("Number is lesser than 3"));
} else if (number > 3 && number <= 6) {
return Mono.error(new NumberLesserThanSixButGretherThan3("Number is grether than 6"));
} else {
return Mono.just(number);
}
})
.onErrorContinue(NumberLesserThanThree.class,
(throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 3"))

.onErrorContinue(NumberLesserThanSixButGretherThan3.class,
(throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 6 but grether than 3"))

.onErrorContinue((throwable, object) ->
System.err.println("Exception: " + throwable.getMessage()))

.subscribe(number -> System.out.println("number is " + number),
error -> System.err.println("Exception in Subscription " + error.getMessage()));
}

public static class NumberLesserThanThree extends RuntimeException {
public NumberLesserThanThree(final String msg) {
super(msg);
}
}

public static class NumberLesserThanSixButGretherThan3 extends RuntimeException {
public NumberLesserThanSixButGretherThan3(final String msg) {
super(msg);
}
}
}

这是我得到的输出:
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception in Subscription Number is grether than 6

问题:为什么是第二个 onErrorContinue未被调用但异常发送给订阅者?

附加说明:
如果我删除第一个和第二个 onErrorContinue ,然后所有异常都由第三个 onErrorContinue 处理.我可以使用这种方法接收所有异常并检查异常类型并继续处理。但是,我想让异常处理更清晰,而不是添加 if..else堵塞。

这个问题与 Why does Thread.sleep() trigger the subscription to Flux.interval()?有何不同

1)这个关于异常处理和异常处理顺序的问题;另一个问题是关于并行处理元素并使主线程等待所有元素处理完成
3)这个问题没有任何关于线程的问题,即使添加 Thread.sleep(10000)之后 . subscribe ,行为没有变化。

最佳答案

这又归结为 onErrorContinue 的异常行为。 .它打破了规则,因为它不会“捕获”错误,然后因此改变下游的行为,它实际上允许支持运算符(operator)“向前看”并相应地采取行动,从而改变上游的结果。

这很奇怪,并且会导致一些不是很明显的行为,例如这里的情况。据我所知,所有支持的运营商都只期待下一个 onErrorContinue运算符,而不是递归搜索所有此类运算符。相反,他们将评估下一个 onErrorContinue 的谓词。 (在这种情况下,无论它是某种类型),然后相应地采取行动 - 如果谓词返回 true,则调用处理程序,或者如果不是,则向下游抛出错误。 (没有任何情况下它会移动到下一个 onErrorContinue 运算符,然后是下一个,直到匹配一个谓词。)

显然这是一个人为的例子 - 但由于这些特质,我几乎总是建议避免 onErrorContinue .有两种“正常”方式可能发生在 flatMap()参与:

  • flatMap()其中有一个“内部 react 链”,即它调用另一个方法或一系列返回发布者的方法 - 然后只需使用 onErrorResume()flatMap()调用来处理这些错误。您可以链接onErrorResume()因为它适用于下游,而不是上游运营商。这是迄今为止最常见的情况。
  • flatMap()是 if/else 的命令式集合,它返回不同的发布者,例如它在这里并且您想要/必须保持命令式样式,抛出异常而不是使用 Mono.error() ,并根据需要捕获,返回 Mono.empty()如果出现错误:
  •     .flatMap(number -> {
    try {
    if (number <= 3) {
    throw new NumberLessThanThree();
    } else if (number <= 6) {
    throw new NumberLessThanSixButGreaterThan3();
    } else {
    return Mono.just(number);
    }
    }
    catch(NumberLessThanThree ex) {
    //Handle it
    return Mono.empty();
    }
    catch(NumberLessThanSixButGreaterThan3 ex) {
    //As above
    }
    })

    一般来说,使用这两种方法之一可以更容易地推断出正在发生的事情。

    (为了阅读评论后的完整性 - 这与 react 链无法在主线程退出之前完成无关。)

    关于project-reactor - Project Reactor + flatMap + Multiple onErrorComplete - 未按预期工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62207580/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com