- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
使用热发布者模型,在云环境中大约有 50% 的时间会发生以下超时异常:
[ERROR] reactor.core.scheduler.Schedulers - Scheduler worker in group main failed with an uncaught exception
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 300000ms (and no fallback has been configured)
Caused by: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 300000ms (and no fallback has been configured)
reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:289)
reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:274)
reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:396)
reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
datadog.trace.instrumentation.reactor.core.TracingSubscriber.onNext(TracingSubscriber.java:75)
reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)
datadog.trace.instrumentation.reactor.core.TracingSubscriber.onNext(TracingSubscriber.java:75)
reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:117)
reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)
但是我不能在本地复制这个。我的主要理论是原因是在创建通量期间发生的,而在
Flux
之间发生了一些不好的事情。和
MessageListenerContainer
.创作看起来像:
@Bean
public Flux<RabbitEventPublishEnvelope> masterFlux(
Queue eventQueue,
ObjectMapper objectMapper,
MessageListenerContainerFactory messageListenerContainerFactory) {
log.info("Create a listener for the topic queue: '{}'", eventQueue.getName());
MessageListenerContainer mlc = messageListenerContainerFactory
.createDirectMessageListenerContainer(eventQueue.getName());
log.info("Define the master Flux for event subscriptions on queue '{}'", eventQueue.getName());
Flux<RabbitEventPublishEnvelope> masterFlux = Flux.create(emitter -> {
mlc.setupMessageListener(m -> {
RabbitEventPublishEnvelope payload = null;
try {
log.info("Creating payload");
payload = objectMapper.readValue(m.getBody(), RabbitEventPublishEnvelope.class);
} catch (IOException e) {
log.error("Failed to parse RabbitEventPublishEnvelope:\n{}", m.getBody());
throw new RuntimeException("Failed to parse RabbitEventPublishEnvelope", e);
} catch (Exception e) {
log.error("Unhandled exception in Flux.create(): {}", e.getMessage(), e);
}
log.info("Emitting payload");
emitter.next(payload);
});
emitter.onRequest(v -> {
log.info("MLC starting");
mlc.start();
log.info("Start recipe event subscription");
});
emitter.onDispose(() -> {
// WARNING: DO NOT issue `mlc.stop();` here or it will cause responses to hang.
// The main reason this callback handler is implemented is to document what will break our implementation.
log.info("Done with recipe event subscription");
});
});
log.info("Created master flux for queue = '{}'", eventQueue.getName());
return masterFlux
.log("Publishing flux")
.publish()
.autoConnect()
.timeout(Duration.ofMillis(10000))
.doOnError(error -> log.error("Unhandled exception in masterFlux(): {}", error.getMessage(), error))
.log("Auto connection successful");
}
此异常感觉像是配置错误,但我无法缩小导致此超时的原因以确认这一点。任何帮助,将不胜感激!
最佳答案
引用:https://www.codota.com/code/java/methods/reactor.core.publisher.Flux/timeout
我怀疑问题是在 masterFlux 的创建过程中没有指定超时。
引用:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
create() 有一个静态方法,定义如下:
创建(消费者<? super FluxSink> 发射器,FluxSink.OverflowStrategy 背压)
所以,我猜,看到OverflowStrategy还没有定义,如果消息传输失败,代码不知道该怎么做,抛出这个异常。可以在此处找到有关 OverflowStrategy 的更多信息:
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/FluxSink.OverflowStrategy.html
关于java - 使用 RabbitMq MessageListenerContainer 的 Reactor Flux 超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63363214/
我正在尝试找出我们应该在下一个项目中使用 Akka 还是 Reactor。最重要的问题之一是 future 选择的框架是否会提供远程处理。正如我所看到的,Akka 以我们想要的方式提供了这个。 在 G
假设我有一个返回 Mono 的 repository.save(..) 方法。 还可以说我有一个 repository.findByEmail(..) 它返回一个 Mono。 问题: 我希望第一个 M
有一个像下面这样的异步发布者,Project Reactor 有没有办法等到整个流完成处理? 当然,不必添加一个未知持续时间的 sleep ...... @Test public void group
我创建了一个简单的 Kafka 消费者,它返回 Flux对象(收到的消息),我正在尝试使用 StepVerifier 对其进行测试. 在我的测试中,我做了这样的事情: Flux flux = cons
我目前正在研究resilience4j 库,由于某种原因,以下代码无法按预期工作: @Test public void testRateLimiterProjectReactor() { //
关闭。这个问题是opinion-based 。目前不接受答案。 想要改进这个问题吗?更新问题,以便 editing this post 可以用事实和引文来回答它。 . 已关闭 7 年前。 Improv
当多个 onErrorContinue添加到管道 处理从 flatMap 抛出的特定类型的异常 ,异常处理没有按预期工作。 我希望下面的代码应该删除元素 1 到 6,而订阅者应该使用元素 7 到 10
我有一个 NettyServerCustomizer,下一个代码: @Override public HttpServer apply(final HttpServer httpServer)
我们正在评估 reactor 库以便在我们的项目中使用它。我们的项目得到了 spring 上下文的支持。因此,我们需要一个工具来构建具有 spring 支持的事件驱动应用程序。 此外,我们的主要关注领
不管上游的完整性如何,有没有办法强制 groupBy() 生成的 Flux 在一段时间后完成(或类似地,限制“打开”组的最大数量)?我有如下内容: Flux someFastPublisher; so
我正在使用 Spring WebClient 调用休息服务。如下所述的 post 调用代码。 Mono response = client.post()
我希望在 react 器运行后添加更多协议(protocol)和工厂。我找不到说明这是允许的文档。当我在 reactor.connectTCP 之前运行 reactor.run 时,程序会在工厂中围绕
(译者加)本文档的一些典型的名词如下: Publisher(发布者)、Subscriber(订阅者)、Subscription(订阅 n.)、subscribe(订阅 v.)。 event/signa
Project Reactor Mono 是否有运算符或一些好的方法来实现 doOnEmpty() 的行为? 我想对操作结果产生副作用(日志记录)。 这是我现在拥有的: myMono .map(v
在以下两个示例中,处理通量流的行为似乎有所不同。 示例 1: public static void main(String[] args) throws InterruptedException {
是否可以使用 LoggingMeterRegistry 以微米为单位收集项目 react 器的通量指标? 最佳答案 只需添加 LoggingMeterRegistry 就可以了到全局 Micromet
我想知道在spring web-flux中使用先前映射结果的好方法,例如 Mono.just(request) ... .flatMap(object0 -> createObject1(object
如何将具有1个元素的助焊剂转换为单声道? Flux.fromArray(arrayOf(1,2,1,1,1,2)) .distinct()
我想知道Reactor和分页的HTTP API。我有一个private fun getPage(pageNumber: Int): Mono。该资源具有“numberOfPages”字段,我想获取所有
我有一个包含多个 URL 和端口的数组。对于他们每个人,我需要发送和接收返回的内容: Flux.fromArray(trackersArray) .flatMap(tracker ->
我是一名优秀的程序员,十分优秀!