gpt4 book ai didi

java - 注入(inject) RetryTemplate 时,BackOffPolicy 和 SimpleRetryPolicy 不生效

转载 作者:塔克拉玛干 更新时间:2023-11-02 20:00:27 24 4
gpt4 key购买 nike

我正在使用 Spring AMQP 发送消息并能够对“自定义”异常执行重试。假设我有一个 Receiver,它抛出一个自定义异常“EventException”,为此,我希望有 n 次重试(在我们的示例 5 中)。在重试之间,我也希望有 5 秒的延迟。这是我的源代码:

@SpringBootApplication
public class DemoApplication implements CommandLineRunner {

final static String queueName = "testing-queue";

@Autowired
AnnotationConfigApplicationContext context;

@Autowired
RabbitTemplate rabbitTemplate;

@Bean
Queue queue() {
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dead-letter-exchange");
Queue queue = new Queue(queueName, true, false, false, arguments);
return queue;
}

@Bean
TopicExchange exchange() {
return new TopicExchange("testing-exchange");
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}

@Bean
Queue deadLetterQueue() {
return new Queue("dead-letter-queue", true);
}

@Bean
FanoutExchange deadLetterExchange() {
return new FanoutExchange("dead-letter-exchange");
}

@Bean
Binding deadLetterBinding(Queue deadLetterQueue, FanoutExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange);
}

@Bean
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");

connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

return connectionFactory;
}

@Bean
SimpleMessageListenerContainer container(
ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
RetryOperationsInterceptor interceptor) {

Advice[] adviceChain = { interceptor };

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setAdviceChain(adviceChain);
container.setMessageListener(listenerAdapter);

return container;
}

@Bean
Receiver receiver() {
return new Receiver();
}

@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
MessageListenerAdapter adapter =
new MessageListenerAdapter(receiver, "receiveMessage");

return adapter;
}

@Bean
RetryOperations retryTemplate() {
Map<Class<? extends Throwable>, Boolean> retryableExceptions =
new HashMap<Class<? extends Throwable>, Boolean>();
retryableExceptions.put(EventException.class, false);

FixedBackOffPolicy backoffPolicy = new FixedBackOffPolicy();
backoffPolicy.setBackOffPeriod(5000);

RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(backoffPolicy);
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(5, retryableExceptions));

return retryTemplate;
}

@Bean
RetryOperationsInterceptor interceptor(RetryOperations retryTemplate) {
RetryOperationsInterceptor interceptor = new RetryOperationsInterceptor();
interceptor.setRecoverer(new CustomMessageRecover());
interceptor.setRetryOperations(retryTemplate);

return interceptor;
// return RetryInterceptorBuilder
// .stateless()
// //.retryOperations(retryTemplate)
// .maxAttempts(5)
// .recoverer(new CustomMessageRecover()).build();
}

public static void main(String[] args) throws InterruptedException {
SpringApplication.run(DemoApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
System.out.println("Sending message...");
rabbitTemplate.convertAndSend(queueName, "Hello from RabbitMQ!");
context.close();
}

public class Receiver {

public void receiveMessage(String message) throws Exception {
System.out.println("!!!!!!!!Message has been recieved!!!!!!");
throw new EventException("TESTING");
}
}

public class CustomMessageRecover implements MethodInvocationRecoverer<Void> {

@Override
public Void recover(Object[] args, Throwable cause) {
System.out.println("IN THE RECOVER ZONE!!!");
throw new AmqpRejectAndDontRequeueException(cause);
}
}

class EventException extends Exception {
private static final long serialVersionUID = 1L;

public EventException() {}

public EventException(String message) {
super(message);
}
}
}

现在在代码中,如您所见,我正在使用 RetryOperationsInterceptor 来拦截并检查抛出的异常类型,并基于此决定执行重试与否,以及重试之间的延迟。

为此,我设置了 RetryTemplate Bean 的 backoffPolicyretryPolicy 并将其注入(inject) RetryOperationsInterceptor。

如果有人能帮助我并告诉我为什么重试和重试之间的延迟不起作用,我将不胜感激。我的消息直接进入死信交换,无需重试和延迟。

谢谢!

最佳答案

您的问题在这里:

retryableExceptions.put(EventException.class, false);

请找到 SimpleRetryPolicy 代码:

public boolean canRetry(RetryContext context) {
Throwable t = context.getLastThrowable();
return (t == null || retryForException(t)) && context.getRetryCount() < maxAttempts;
}

还有:

private boolean retryForException(Throwable ex) {
return retryableClassifier.classify(ex);
}

因为您为您的 EventException 指定了一个 false,它不会是可重试的。因此会进行任何重试和退避。

关于java - 注入(inject) RetryTemplate 时,BackOffPolicy 和 SimpleRetryPolicy 不生效,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29575836/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com