gpt4 book ai didi

java - 用@KafkaListener注释的方法不会传播遇到的异常。由于此原因无法调用我的重试配置

转载 作者:太空宇宙 更新时间:2023-11-04 09:20:20 25 4
gpt4 key购买 nike

我有一个用 @kafkaListener 注释的 Kafka Listener 方法。它接受消息类型的参数和确认。我处理收到的消息并使用acknowledgement.acknowledge() 进行手动提交。我已经在容器上设置了重试模板。重试策略是特定于异常定义的。为此,我创建了自己的 RetryPloicy 类并使用 ExceptionClassifierRetryPolicy 进行扩展。在该类中,根据收到的异常,我返回 AlwaysRetryPolicy、NeverRetryPolicy 和 SimpleRetryPolicy。我遇到的问题是,当在监听器方法中处理消息期间发生 DataAccessException 时,我想永远重试,并且相应地配置了重试策略,但是监听器方法总是抛出 ListenerExecutionFailedException,而不是在上面的消息处理方法中的监听器方法之前在堆栈下方抛出遇到的异常。由于此异常是由监听器引发的,因此我的重试配置无法按预期工作。

示例代码如下:

    @KafkaListener(topics = "topicName", containerFactory = "kafkaListenerContainerFactory")
public void listenToKafkaTopic(@Payload Message<SomeAvroType> message, Acknowledgement ack){
SomeAvroType type = message.getPayLoad();
type.processIncomingMessage();
ack.acknowledge();
}

重试策略配置

    @component 
public class MyRetryPolicy extends ExceptionClassifierRetryPolicy
{
@PostConstruct
public void init(){
final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(3);

this.setExceptionClassifier( new Classifier<Throwable, RetryPolicy>()
{
@Override
public RetryPolicy classify( Throwable classifiable ){

// Always Retry when instanceOf TransientDataAccessException
if ( classifiable instanceof TransientDataAccessException)
{
return new AlwaysRetryPolicy;
}
else if(classifiable instanceOf SomeOtherException){

return simpleRetryPolicy;
}

// Do not retry for other exceptions
return new NeverRetryPolicy();
}
} );
}
}

我使用容器上提供的大部分自动配置,因此我在 Retry Config 类中自动连接 ConcurrentKafkaListenerContainerFactory。

    @configuration
public class RetryConfig{


@Bean
public RetryTemplate retryTemplate(@Autowired @Qualifier("kafkaListenerContainerFactory")ConcurrentKafkaListenerContainerFactory factory;){

RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new MyRetryPolicy());
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy()
fixedBackOffPolicy.setBackOffPeriod(1000l);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

factory.setRetryTemplate(retryTemplate);
factory.setAckOnError(false);
factory.setRecoveryCallback(//configure recovery after retries are exhausted and commit offset)

}
}

当我在 Debug模式下运行此程序,并在 processIncomingMessage() 中抛出 TransientDataAccessException 时,我希望始终重试,但监听器方法不会抛出传播的异常,但会抛出 ListenerExecutionFailedException ,并且其原因(e.getCause())是 TransientDataAccessException。因此,重试策略始终评估为 NeverretryPloicy。有没有办法在监听器中抛出传播的异常,以便我的重试配置正确执行?

最佳答案

请参阅BinaryExceptionClassifier及其 traverseCauses 属性。

/**
* Create a binary exception classifier.
* @param defaultValue the default value to use
* @param typeMap the map of types to classify
* @param traverseCauses if true, throwable's causes will be inspected to find
* non-default class
*/
public BinaryExceptionClassifier(Map<Class<? extends Throwable>, Boolean> typeMap, boolean defaultValue,
boolean traverseCauses) {
super(typeMap, defaultValue);
this.traverseCauses = traverseCauses;
}

关于java - 用@KafkaListener注释的方法不会传播遇到的异常。由于此原因无法调用我的重试配置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58386429/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com