- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我将 Spring Reactor 与 Spring Cloud Stream (GCP Pub/Sub Binder) 一起使用并遇到错误处理问题。我可以通过一个非常简单的示例重现该问题:
@Bean
public Function<Flux<String>, Mono<Void>> consumer() {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.map(msg -> {
if (true) {
throw new RuntimeException("exception encountered!");
}
return msg;
})
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
我期望的行为是看到“无法使用消息”打印,但是,这似乎不是发生的事情。添加
.log()
时调用我看到的链式店
onNext
/
onComplete
信号,我希望看到
onError
信号。
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
我注意到在我的服务类的深处,我试图对我的 Reactor 发布者进行错误处理。但是,
onError
使用 Spring Cloud Stream 时不会出现信号。如果我只是这样调用我的服务
myService.processMessage(msg)
在单元测试并模拟异常时,我的 react 链将正确传播错误信号。
ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...
为了进一步混淆,我可以得到
onError
如果我将 Spring Cloud Stream 绑定(bind)切换到非响应式实现,则在我的响应式链中发出信号,如下所示:
@Bean
public Consumer<CustomMessage> consumer(MyService myService) {
return customMessage -> Mono.just(customMessage)
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable)) // prints successfully this time
.subscribe();
}
最佳答案
所以这是我从自己的调查中收集到的,也许这可能对其他人有所帮助。预先警告,我可能没有使用正确的“Spring Reactor Language”,但这就是我最终解决它的方式......
在 Hoxton.SR5
, 一个 onErrorContinue
was included on the reactive binding管理通量订阅。 onErrorContinue
的问题是它影响上游 通过在失败的运算符上应用 BiConsumer 函数(如果支持)。
这意味着当我们的 map
中发生错误时/flatMap
运营商,onErrorContinue
BiConsumer 将启动并将下游信号修改为 onComplete()
( Mono<T>
) 或 request(...)
(如果它从 Flux<T>
请求新元素)。这导致了我们的 doOnError(...)
由于没有 onError()
,运算符未执行信号。
最终 SCS 团队决定 remove this error handling wrapper . Hoxton.SR6
不再有这个onErrorContinue
.但是,这意味着传播到 SCS 绑定(bind)的异常将导致 Flux 订阅被切断。由于没有订阅者,后续消息将无处可路由。
此错误处理已传递给客户端,我们添加了 onErrorResume
运算符(operator)到内部发布者以有效地丢弃错误信号。在 myService::processMessage
中遇到错误时出版商, onErrorResume
将发布者切换到作为参数传入的后备发布者,并从运营商链中的该点恢复。在我们的例子中,这个后备发布者只返回 Mono.empty()
这允许我们丢弃错误信号,同时仍然允许内部错误处理机制运行,同时也不影响外部源发布者。onErrorResume
示例/说明
上述技术可以用一个非常简单的例子来说明。
Flux.just(1, 2, 3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error")
: Mono.just(i))
.onErrorResume(t -> Flux.just(4, 5, 6))
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
Flux<Integer>
以上将输出以下内容:
Element: 1
Element: 4
Element: 5
Element: 6
由于在元素
2
处遇到错误,
onErrorResume
回退开始,新发布者变为
Flux.just(4, 5, 6)
有效地从回退中恢复。在我们的例子中,我们不想影响源发布者(即
Flux.just(1, 2, 3)
)。我们只想删除错误的元素(
2
)并继续下一个元素(
3
)。
Flux.just(4, 5, 6)
至
Flux.empty()
或
Mono.empty()
像这样:
Flux.just(1, 2, 3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error")
: Mono.just(i))
.onErrorResume(t -> Mono.empty())
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
这将导致输出以下内容:
Element: 1
这是因为
onErrorResume
已将上游发布者替换为后备发布者(即
Mono.empty()
)并从那时起恢复。
Element: 1
Element: 3
我们必须将
onErrorResume
flatMap
的内部发布者上的运算符:
public Mono<Integer> func(int i) {
return i = 2 ? Mono.error(new RuntimeException("error")) : Mono.just(i);
}
Flux.just(1, 2, 3)
.flatMap(i -> func(i)
onErrorResume(t -> Mono.empty()))
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
现在,
onErrorResume
仅影响
func(i)
返回的内部发布者.如果
func(i)
中的运算符发生错误,
onErrorResume
将回退到
Mono.empty()
有效完成
Mono<T>
没有炸毁。这也仍然允许在
doOnError
中使用错误处理运算符(例如
func(i)
)在回退运行之前应用。这是因为,不像
onErrorContinue
,它不会影响上游操作符并改变错误位置的下一个信号。
Hoxton.SR6
并将代码更改为如下所示:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(msg -> myService.processMessage(msg)
.onErrorResume(throwable -> Mono.empty())
)
.then();
}
请注意
onErrorResume
位于内部发布者(在
flatMap
内)。
关于java - 当 Spring Cloud Stream 响应式(Reactive)使用者遇到异常时,为什么我会收到 onComplete 信号?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64163300/
我是一名优秀的程序员,十分优秀!