gpt4 book ai didi

RabbitMQ 出现异常时的延迟消息

转载 作者:行者123 更新时间:2023-12-04 11:34:42 25 4
gpt4 key购买 nike

我正在使用该插件 ( https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq ) 并且运行良好。我发送了一条延迟 X 秒的消息,它的处理延迟了 X 秒。
问题出在流程逻辑上。如果它运行良好(快乐路径)我没有问题。但是如果过程失败,我期望的是消息重新排队以相同的延迟处理,但它是立即处理的。
有没有办法在出现异常时使用原始指定的延迟自动重新排队消息?

最佳答案

不;延迟不适用于重试;如果消息顺序不重要,您可以将消息重新发布到队列的尾部。
或者,您可以配置具有固定回退的重试拦截器。
https://docs.spring.io/spring-amqp/docs/current/reference/html/#retry

Spring Retry provides a couple of AOP interceptors and a great deal of flexibility to specify the parameters of the retry (number of attempts, exception types, backoff algorithm, and others). Spring AMQP also provides some convenience factory beans for creating Spring Retry interceptors in a convenient form for AMQP use cases, with strongly typed callback interfaces that you can use to implement custom recovery logic. ...


编辑
使用 RabbitTemplate而不是消息驱动的监听器:
@SpringBootApplication
@EnableScheduling
@EnableTransactionManagement
public class So69020120Application {

public static void main(String[] args) {
SpringApplication.run(So69020120Application.class, args);
}

@Autowired
Processor processor;

@Scheduled(fixedDelay = 5000)
public void sched() {
try {
while (this.processor.process()) {
}
}
catch (Exception e) {
e.printStackTrace();
}
}

@Bean
RabbitTransactionManager transactionManager(ConnectionFactory cf) {
return new RabbitTransactionManager(cf);
}

@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
IntStream.range(0, 4).forEach(i -> template.convertAndSend("queue", "good"));
template.convertAndSend("queue", "fail");
IntStream.range(0, 4).forEach(i -> template.convertAndSend("queue", "good"));
};
}

@Bean
Queue queue() {
return new Queue("queue");
}

}

@Component
class Processor {

private final RabbitTemplate template;

private final AtomicBoolean fail = new AtomicBoolean(true);

Processor(RabbitTemplate template) {
this.template = template;
template.setChannelTransacted(true);
}

@Transactional
public boolean process() {
String data = (String) template.receiveAndConvert("queue");
if (data == null) {
System.out.println("No More Messages");
return false;
}
System.out.println(data);
if (data.equals("fail") && this.fail.getAndSet(false)) {
throw new RuntimeException("test");
}
return true;
}

}
good
good
good
good
fail
java.lang.RuntimeException: test
at com.example.demo.Processor.process(So69020120Application.java:86)
at com.example.demo.Processor$$FastClassBySpringCGLIB$$6adeaa38.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at com.example.demo.Processor$$EnhancerBySpringCGLIB$$bad30db1.process(<generated>)
at com.example.demo.So69020120Application.sched(So69020120Application.java:36)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
fail
good
good
good
good
No More Messages

关于RabbitMQ 出现异常时的延迟消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69020120/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com