gpt4 book ai didi

java - 问题处理 RabbitMq Listener 抛出的异常

转载 作者:行者123 更新时间:2023-12-01 16:49:53 27 4
gpt4 key购买 nike

我的项目中有这个监听器:

@Service
@RequiredArgsConstructor
@Slf4j
public class ConsumerService {

@RabbitListener(queues = "${queue.treatment.request}")
public void handleQueueTreatmentRequestMessageReception(AppointmentPayloadDTO myAppointment) {

log.info(" ============================ Message received in queue-treatment-plan-new\n: " + myAppointment);

log.info(" ============================ Creating new treatment plan ....");
}
}

以及这个错误处理程序:

@Configuration
public class RabbitMQErrorHandler implements ErrorHandler
{

@Override
public void handleError(Throwable t) {
System.out.println("======================================================================================");
System.out.println("error occurred in message listener and handled in error handler" + t.toString());
System.out.println("======================================================================================");
}
}

我的目标是处理 MessageConversionException。该消息由其他微服务发送,异常如下所示:


org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1693) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1583) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1486) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1477) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1421) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:963) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:81) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1284) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1190) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_201]
Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to convert serialized Message content
at org.springframework.amqp.support.converter.SimpleMessageConverter.fromMessage(SimpleMessageConverter.java:114) ~[spring-amqp-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:302) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:323) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:122) ~[spring-amqp-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:205) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:132) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1579) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
... 10 common frames omitted
Caused by: java.lang.IllegalArgumentException: Could not deserialize object
at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:94) ~[spring-amqp-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.support.converter.SimpleMessageConverter.fromMessage(SimpleMessageConverter.java:110) ~[spring-amqp-2.2.5.RELEASE.jar:2.2.5.RELEASE]
... 16 common frames omitted
Caused by: java.io.InvalidObjectException: enum constant FOOBOO does not exist in class com.hospital.appointment.enums.Disease
at java.io.ObjectInputStream.readEnum(ObjectInputStream.java:2014) ~[na:1.8.0_201]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570) ~[na:1.8.0_201]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) ~[na:1.8.0_201]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) ~[na:1.8.0_201]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) ~[na:1.8.0_201]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) ~[na:1.8.0_201]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[na:1.8.0_201]
at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:91) ~[spring-amqp-2.2.5.RELEASE.jar:2.2.5.RELEASE]
... 17 common frames omitted
Caused by: java.lang.IllegalArgumentException: No enum constant com.hospital.appointment.enums.Disease.FOOBOO
at java.lang.Enum.valueOf(Enum.java:238) ~[na:1.8.0_201]
at java.io.ObjectInputStream.readEnum(ObjectInputStream.java:2011) ~[na:1.8.0_201]
... 24 common frames omitted

所以我现在的问题是我的 ErrorHandler 根本不起作用。有人知道我在这里做错了什么吗?

编辑#1:

在更仔细地阅读了 spring 文档后,我准确地发现了我的代码中不起作用的内容:

https://docs.spring.io/spring-amqp/reference/html/#exception-handling

Spring 明确表示:

但是,存在一类错误,监听器无法控制行为。当遇到无法转换的消息(例如,无效的 content_encoding header )时,在消息到达用户代码之前会引发一些异常。...

具体来说,它会拒绝因以下错误而失败的消息:

o.s.amqp…​MessageConversionException:使用 MessageConverter 转换传入消息负载时可能会抛出。

在几行之后它定义了一个解决方案:

您可以使用 FatalExceptionStrategy 配置此错误处理程序的实例,以便用户可以提供自己的条件消息拒绝规则 - 例如,从 Spring Retry 到 BinaryExceptionClassifier 的委托(delegate)实现(消息监听器和异步案例)。此外,ListenerExecutionFailedException 现在具有可在决策中使用的 failedMessage 属性。

编辑#2:

在谷歌上搜索了一下并遵循@Borislav Stoilov 的第二种方法后,我开发了以下适合我的解决方案。请记住,我将 RabbitMQErrorHandler 的注释更改为 @Service:

@Configuration
public class RabbitMQConsumerConfiguration implements RabbitListenerConfigurer {


@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ErrorHandler myRabbitMQErrorHandler,ConnectionFactory connectionFactory,SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setErrorHandler(myRabbitMQErrorHandler);
return factory;
}
}

@RequiredArgsConstructor
@Slf4j
@Service
public class RabbitMQErrorHandler implements ErrorHandler
{
@Override
public void handleError(Throwable t) {
System.out.println("======================================================================================");
System.out.println("error occurred in message listener and handled in error handler" + t.toString());
System.out.println("======================================================================================");

throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);

}
}

最佳答案

您是否尝试过使用死信队列?它基本上是一个存储所有未传递消息或错误的队列。

你需要将它定义为一个bean,像这样

@Bean
Queue deadLetterQueueBean() {
return QueueBuilder.durable("custom.dead.letter.queue").build();
}

然后定义类似于您已经使用的监听器。

本教程对我有用 https://docs.spring.io/autorepo/docs/spring-cloud-stream-binder-rabbit-docs/1.1.1.RELEASE/reference/html/rabbit-dlq-processing.html

选项 2:为 SimpleMessageListenerContainer

设置 errorHandler
class CustomErrorHandler implements ErrorHandler {
void handleError(Throwable genericError) {
// do something in case of error
}
}

@Autowired SimpleMessageListenerContainer

simpleMessageListenerContainer;

...
simpleMessageListenerContainer.setErrorHandler(new CustomErrorHandler());

关于java - 问题处理 RabbitMq Listener 抛出的异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61710893/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com