gpt4 book ai didi

java - Project Reactor-如何处理Flux.interval的OverflowException?

转载 作者:行者123 更新时间:2023-12-03 15:43:28 28 4
gpt4 key购买 nike

我正在使用Spring Webflux构建Spring Boot应用程序,并且我想使该应用程序完全不阻塞。该应用程序本身具有一些REST端点和一个需要每几秒钟运行的批处理作业。对于批处理作业,我正在尝试使用Flux.interval(Duration.ofMillis(1000))生成长值,而忽略并运行预定的作业。

Flux.interval(Duration.ofMillis(1000))
.flatMap(ignore -> doSomething())
.subscribe();

但是一段时间后我得到了错误
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 257 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
有人可以告诉我如何克服这个问题吗?

最佳答案

造成此问题的原因很可能是doSomething()操作花费的时间超过了指定的Flux间隔,这意味着一段时间后doSomething作业彼此重叠并且反压开始。按需),并且Flux.interval具有默认的并发限制(256),运算符(operator)不知所措,这导致flatMap

根据您的要求,有两个潜在的解决方案:

1.忽略溢出错误并丢弃可能溢出的信号

这意味着有时候,如果我们已经有很多(256)正在进行,我们会跳过一秒钟并且不在此间隔内安排作业。

Flux.interval(Duration.ofMillis(1000))
.onBackpressureDrop()
.flatMap(ignore -> doSomething())

2.将 OverflowException并发设置为更高的值

一段时间后,这仍可能导致OverflowException,但会延迟问题(可能不是最佳解决方案)。

Flux.interval(Duration.ofMillis(1000))
.flatMap(ignore -> doSomething(), Integer.MAX_VALUE)

3.不要让工作彼此重叠

我们从热源切换到冷源,从而消除了溢出的可能性。但是,我们无法保证每秒安排一次事件。相反,它们将在上一份工作完成且至少经过1秒后按需安排。

Mono.just(1).repeat() // infinite Flux with backpressure
.delayElements(Duration.ofMillis(1000))
.concatMap(ignore -> doSomething())

如果您适合重叠工作,并且可以在 flatMap调用中定义合理的并发级别,那么也可以将此解决方案与上一个解决方案结合使用。

关于java - Project Reactor-如何处理Flux.interval的OverflowException?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60077245/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com