gpt4 book ai didi

java - 当 Spring Cloud Stream 响应式(Reactive)使用者遇到异常时,为什么我会收到 onComplete 信号?

转载 作者:行者123 更新时间:2023-12-04 08:49:10 24 4
gpt4 key购买 nike

我将 Spring Reactor 与 Spring Cloud Stream (GCP Pub/Sub Binder) 一起使用并遇到错误处理问题。我可以通过一个非常简单的示例重现该问题:

@Bean
public Function<Flux<String>, Mono<Void>> consumer() {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.map(msg -> {
if (true) {
throw new RuntimeException("exception encountered!");
}
return msg;
})
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
我期望的行为是看到“无法使用消息”打印,但是,这似乎不是发生的事情。添加 .log() 时调用我看到的链式店 onNext/ onComplete信号,我希望看到 onError信号。
我的实际代码如下所示:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
我注意到在我的服务类的深处,我试图对我的 Reactor 发布者进行错误处理。但是, onError使用 Spring Cloud Stream 时不会出现信号。如果我只是这样调用我的服务 myService.processMessage(msg)在单元测试并模拟异常时,我的 react 链将正确传播错误信号。
当我连接到 Spring Cloud Stream 时,这似乎是一个问题。我想知道 Spring Cloud Function/Stream 是否正在执行任何全局错误包装?
在我的非平凡代码中,我确实注意到此错误消息可能与为什么我没有收到错误信号有关?
ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...
为了进一步混淆,我可以得到 onError如果我将 Spring Cloud Stream 绑定(bind)切换到非响应式实现,则在我的响应式链中发出信号,如下所示:
@Bean
public Consumer<CustomMessage> consumer(MyService myService) {
return customMessage -> Mono.just(customMessage)
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable)) // prints successfully this time
.subscribe();
}

最佳答案

所以这是我从自己的调查中收集到的,也许这可能对其他人有所帮助。预先警告,我可能没有使用正确的“Spring Reactor Language”,但这就是我最终解决它的方式......
Hoxton.SR5 , 一个 onErrorContinue was included on the reactive binding管理通量订阅。 onErrorContinue 的问题是它影响上游 通过在失败的运算符上应用 BiConsumer 函数(如果支持)。
这意味着当我们的 map 中发生错误时/flatMap运营商,onErrorContinue BiConsumer 将启动并将下游信号修改为 onComplete() ( Mono<T> ) 或 request(...) (如果它从 Flux<T> 请求新元素)。这导致了我们的 doOnError(...)由于没有 onError(),运算符未执行信号。
最终 SCS 团队决定 remove this error handling wrapper . Hoxton.SR6不再有这个onErrorContinue .但是,这意味着传播到 SCS 绑定(bind)的异常将导致 Flux 订阅被切断。由于没有订阅者,后续消息将无处可路由。
此错误处理已传递给客户端,我们添加了 onErrorResume运算符(operator)到内部发布者以有效地丢弃错误信号。在 myService::processMessage 中遇到错误时出版商, onErrorResume 将发布者切换到作为参数传入的后备发布者,并从运营商链中的该点恢复。在我们的例子中,这个后备发布者只返回 Mono.empty()这允许我们丢弃错误信号,同时仍然允许内部错误处理机制运行,同时也不影响外部源发布者。onErrorResume示例/说明
上述技术可以用一个非常简单的例子来说明。

Flux.just(1, 2, 3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error")
: Mono.just(i))
.onErrorResume(t -> Flux.just(4, 5, 6))
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
Flux<Integer>以上将输出以下内容:
Element: 1
Element: 4
Element: 5
Element: 6
由于在元素 2 处遇到错误, onErrorResume回退开始,新发布者变为 Flux.just(4, 5, 6)有效地从回退中恢复。在我们的例子中,我们不想影响源发布者(即 Flux.just(1, 2, 3) )。我们只想删除错误的元素( 2)并继续下一个元素( 3)。
我们不能简单地改变 Flux.just(4, 5, 6)Flux.empty()Mono.empty()像这样:
Flux.just(1, 2, 3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error")
: Mono.just(i))
.onErrorResume(t -> Mono.empty())
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
这将导致输出以下内容:
Element: 1
这是因为 onErrorResume已将上游发布者替换为后备发布者(即 Mono.empty() )并从那时起恢复。
为了实现我们想要的输出:
Element: 1
Element: 3
我们必须将 onErrorResume flatMap 的内部发布者上的运算符:
public Mono<Integer> func(int i) {
return i = 2 ? Mono.error(new RuntimeException("error")) : Mono.just(i);
}

Flux.just(1, 2, 3)
.flatMap(i -> func(i)
onErrorResume(t -> Mono.empty()))
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
现在, onErrorResume仅影响 func(i) 返回的内部发布者.如果 func(i) 中的运算符发生错误, onErrorResume将回退到 Mono.empty()有效完成 Mono<T>没有炸毁。这也仍然允许在 doOnError 中使用错误处理运算符(例如 func(i) )在回退运行之前应用。这是因为,不像 onErrorContinue ,它不会影响上游操作符并改变错误位置的下一个信号。
最终解决方案
在我的问题中重用代码片段,我已将 Spring Cloud 版本升级到 Hoxton.SR6并将代码更改为如下所示:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(msg -> myService.processMessage(msg)
.onErrorResume(throwable -> Mono.empty())
)
.then();
}
请注意 onErrorResume位于内部发布者(在 flatMap 内)。

关于java - 当 Spring Cloud Stream 响应式(Reactive)使用者遇到异常时,为什么我会收到 onComplete 信号?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64163300/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com