- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
使用热发布者模型,在云环境中大约有 50% 的时间会发生以下超时异常:
[ERROR] reactor.core.scheduler.Schedulers - Scheduler worker in group main failed with an uncaught exception
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 300000ms (and no fallback has been configured)
Caused by: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 300000ms (and no fallback has been configured)
reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:289)
reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:274)
reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:396)
reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
datadog.trace.instrumentation.reactor.core.TracingSubscriber.onNext(TracingSubscriber.java:75)
reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)
datadog.trace.instrumentation.reactor.core.TracingSubscriber.onNext(TracingSubscriber.java:75)
reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:117)
reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)
但是我不能在本地复制这个。我的主要理论是原因是在创建通量期间发生的,而在
Flux
之间发生了一些不好的事情。和
MessageListenerContainer
.创作看起来像:
@Bean
public Flux<RabbitEventPublishEnvelope> masterFlux(
Queue eventQueue,
ObjectMapper objectMapper,
MessageListenerContainerFactory messageListenerContainerFactory) {
log.info("Create a listener for the topic queue: '{}'", eventQueue.getName());
MessageListenerContainer mlc = messageListenerContainerFactory
.createDirectMessageListenerContainer(eventQueue.getName());
log.info("Define the master Flux for event subscriptions on queue '{}'", eventQueue.getName());
Flux<RabbitEventPublishEnvelope> masterFlux = Flux.create(emitter -> {
mlc.setupMessageListener(m -> {
RabbitEventPublishEnvelope payload = null;
try {
log.info("Creating payload");
payload = objectMapper.readValue(m.getBody(), RabbitEventPublishEnvelope.class);
} catch (IOException e) {
log.error("Failed to parse RabbitEventPublishEnvelope:\n{}", m.getBody());
throw new RuntimeException("Failed to parse RabbitEventPublishEnvelope", e);
} catch (Exception e) {
log.error("Unhandled exception in Flux.create(): {}", e.getMessage(), e);
}
log.info("Emitting payload");
emitter.next(payload);
});
emitter.onRequest(v -> {
log.info("MLC starting");
mlc.start();
log.info("Start recipe event subscription");
});
emitter.onDispose(() -> {
// WARNING: DO NOT issue `mlc.stop();` here or it will cause responses to hang.
// The main reason this callback handler is implemented is to document what will break our implementation.
log.info("Done with recipe event subscription");
});
});
log.info("Created master flux for queue = '{}'", eventQueue.getName());
return masterFlux
.log("Publishing flux")
.publish()
.autoConnect()
.timeout(Duration.ofMillis(10000))
.doOnError(error -> log.error("Unhandled exception in masterFlux(): {}", error.getMessage(), error))
.log("Auto connection successful");
}
此异常感觉像是配置错误,但我无法缩小导致此超时的原因以确认这一点。任何帮助,将不胜感激!
最佳答案
引用:https://www.codota.com/code/java/methods/reactor.core.publisher.Flux/timeout
我怀疑问题是在 masterFlux 的创建过程中没有指定超时。
引用:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
create() 有一个静态方法,定义如下:
创建(消费者<? super FluxSink> 发射器,FluxSink.OverflowStrategy 背压)
所以,我猜,看到OverflowStrategy还没有定义,如果消息传输失败,代码不知道该怎么做,抛出这个异常。可以在此处找到有关 OverflowStrategy 的更多信息:
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/FluxSink.OverflowStrategy.html
关于java - 使用 RabbitMq MessageListenerContainer 的 Reactor Flux 超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63363214/
我看到spring Kafka代码,我有一些疑问: 如果我们使用 1 个 @kafkaListener 和 2 个主题,那么 spring Kafka 会创建一个 MessageListenerCon
简而言之,我尝试用 lambda 语法编写此内容: bean = context.getBean(JmsListenerEndpointRegistry.class); for (MessageLis
我想知道如果指向某个主题的 MessageListenerContainer 同时收到 2 条(或更多)消息,会发生什么情况。 例如,应用的 2 个用户同时触发发布,因此 2 个 jmstemplat
我有一个 Spring 应用程序,它必须使用来自某些 JMS 队列的消息。队列的数量必须是可配置的,因此我们必须通过读取配置文件来手动创建消费者。因此我可以拥有类型 1 的 x 队列和类型 2 的 y
使用热发布者模型,在云环境中大约有 50% 的时间会发生以下超时异常: [ERROR] reactor.core.scheduler.Schedulers - Sch
我的 spring-config.xml 中有以下 xml 代码 现在,我正在将我的 spring xml 配置文件转换为 Java 配置。 我翻译成这样 @Bean(name
我是一名优秀的程序员,十分优秀!