gpt4 book ai didi

Spring-boot-starter RabbitMQ 全局错误处理

转载 作者:行者123 更新时间:2023-12-04 02:51:04 24 4
gpt4 key购买 nike

我正在使用 spring-boot-starter-amqp 1.4.2。生产者和消费者工作正常,但有时传入的 JSON 消息的语法不正确。这会导致以下(正确)异常:

org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token...

将来我可能会面临更多的异常(exception)。所以我想配置一个全局错误处理程序,这样如果任何一个消费者有任何异常,我都可以全局处理它。

注意:在这种情况下,消息根本没有到达消费者。我想在全局范围内处理消费者的此类异常。

请找到以下代码:

RabbitConfiguration.java
@Configuration
@EnableRabbit
public class RabbitMqConfiguration {

@Autowired
private CachingConnectionFactory cachingConnectionFactory;

@Bean
public MessageConverter jsonMessageConverter()
{
return new Jackson2JsonMessageConverter();
}

@Bean
@Primary
public RabbitTemplate rabbitTemplate()
{
RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
template.setMessageConverter(jsonMessageConverter());
return template;
}

}

消费者
@RabbitListener(
id = "book_queue",
bindings = @QueueBinding(
value = @Queue(value = "book.queue", durable = "true"),
exchange = @Exchange(value = "book.exchange", durable = "true", delayed = "true"),
key = "book.queue"
)
)
public void handle(Message message) {
//Business Logic
}

任何人都可以帮助我在全局范围内处理错误处理程序。您的帮助应该是可观的。

根据 Gary 评论更新问题

正如你所说,我可以运行你的示例并获得预期的输出,我只是想根据你的示例尝试更多的负面案例,但我无法理解一些事情,
this.template.convertAndSend(queue().getName(), new Foo("bar"));

输出

收到:Foo [foo=bar]

上面的代码工作正常。现在我发送一些其他 bean 而不是“Foo”
this.template.convertAndSend(queue().getName(), new Differ("snack","Hihi","how are you"));

输出

收到:Foo [foo=null]

消费者不应该接受这个消息,因为它是完全不同的 bean(Differ.class 不是 Foo.class)所以我期待它应该转到“ConditionalRejectingErrorHandler”。为什么它接受错误的有效负载并打印为 null ?如果我错了,请纠正我。

编辑 1:

加里,正如你所说,我在发送消息时设置了标题“ TypeId ”,但消费者仍然可以转换错误消息并且它没有抛出任何错误......请找到下面的代码,我有使用了您的代码示例并进行了以下修改,

1) 发送消息时添加“__TypeId__”,
this.template.convertAndSend(queue().getName(), new Differ("snack","hihi","how are you"),m -> {
m.getMessageProperties().setHeader("__TypeId__","foo");
return m;
});

2) 在“Jackson2JsonMessageConverter”中添加“DefaultClassMapper”
@Bean
public MessageConverter jsonConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
DefaultClassMapper mapper = new DefaultClassMapper();
mapper.setDefaultType(Foo.class);
converter.setClassMapper(mapper);
return new Jackson2JsonMessageConverter();
}

最佳答案

覆盖 Boot 的监听器容器工厂 bean,如 Enable Listener Endpoint Annotations 中所述.

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setErrorHandler(myErrorHandler());
...
return factory;
}

您可以注入(inject) ErrorHandler 的自定义实现它将被添加到工厂创建的每个监听器容器中。
void handleError(Throwable t);

throwable 将是 ListenerExecutionFailedException从版本 1.6.7(启动 1.4.4)开始,它的 failedMessage 中有原始入站消息。属性(property)。

默认错误处理程序会考虑 MessageConversionException 等原因。是致命的(他们不会被重新排队)。

如果您希望保留该行为(对于此类问题是正常的),您应该抛出 AmqpRejectAndDontRequeueException处理错误后。

顺便说一句,你不需要 RabbitTemplate bean ;如果你只有一个 MessageConverter bean 在应用程序中,boot 将自动将其连接到容器和模板中。

由于您将覆盖 Boot 的工厂,因此您 必须在那里连接转换器。

编辑

您可以使用默认的 ConditionalRejectingErrorHandler , 但使用 FatalExceptionStrategy 的自定义实现注入(inject)它.事实上,你可以将它的 DefaultExceptionStrategy 子类化。并覆盖 isFatal(Throwable t) ,然后,处理错误后,返回 super.isFatal(t) .

编辑2

完整示例;发送 1 条好消息和 1 条坏消息:
package com.example;

import org.slf4j.Logger;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.util.ErrorHandler;

@SpringBootApplication
public class So42215050Application {

public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
context.getBean(So42215050Application.class).runDemo();
context.close();
}

@Autowired
private RabbitTemplate template;

private void runDemo() throws Exception {
this.template.convertAndSend(queue().getName(), new Foo("bar"));
this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
return new Message("some bad json".getBytes(), m.getMessageProperties());
});
Thread.sleep(5000);
}

@RabbitListener(queues = "So42215050")
public void handle(Foo in) {
System.out.println("Received: " + in);
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonConverter());
factory.setErrorHandler(errorHandler());
return factory;
}

@Bean
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
}

@Bean
public Queue queue() {
return new Queue("So42215050", false, false, true);
}

@Bean
public MessageConverter jsonConverter() {
return new Jackson2JsonMessageConverter();
}

public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

@Override
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException) {
ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
logger.error("Failed to process inbound message from queue "
+ lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
+ "; failed message: " + lefe.getFailedMessage(), t);
}
return super.isFatal(t);
}

}

public static class Foo {

private String foo;

public Foo() {
super();
}

public Foo(String foo) {
this.foo = foo;
}

public String getFoo() {
return this.foo;
}

public void setFoo(String foo) {
this.foo = foo;
}

@Override
public String toString() {
return "Foo [foo=" + this.foo + "]";
}

}
}

结果:
Received: Foo [foo=bar]

2017-02-14 09:42:50.972 ERROR 44868 --- [cTaskExecutor-1] 5050Application$MyFatalExceptionStrategy : Failed to process inbound message from queue So42215050; failed message: (Body:'some bad json' MessageProperties [headers={TypeId=com.example.So42215050Application$Foo}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=So42215050, receivedDelay=null, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-P2QqY0PMD1ppX5NnkUPhFA, consumerQueue=So42215050])



编辑3

JSON 不传达任何类型信息。默认情况下,要转换的类型将从方法参数类型中推断出来。如果您希望拒绝任何无法转换为该类型的内容,则需要适本地配置消息转换器。

例如:
@Bean
public MessageConverter jsonConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
DefaultClassMapper mapper = new DefaultClassMapper();
mapper.setDefaultType(Foo.class);
converter.setClassMapper(mapper);
return converter;
}

现在,当我将示例更改为发送 Bar而不是 Foo ...
public static class Bar {

...

}


this.template.convertAndSend(queue().getName(), new Bar("baz"));

我得到...
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.So42215050Application$Bar] to [com.example.So42215050Application$Foo] for GenericMessage [payload=Bar [foo=baz], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=So42215050, amqp_contentEncoding=UTF-8, amqp_deliveryTag=3, amqp_consumerQueue=So42215050, amqp_redelivered=false, id=6d7e23a3-c2a7-2417-49c9-69e3335aa485, amqp_consumerTag=amq.ctag-6JIGkpmkrTKaG32KVpf8HQ, contentType=application/json, __TypeId__=com.example.So42215050Application$Bar, timestamp=1488489538017}]

但这仅在发件人设置 __TypeId__ 时才有效。 header (如果模板配置了相同的适配器,则模板会执行此操作)。

EDIT4
@SpringBootApplication
public class So42215050Application {

private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
context.getBean(So42215050Application.class).runDemo();
context.close();
}

@Autowired
private RabbitTemplate template;

private void runDemo() throws Exception {
this.template.convertAndSend(queue().getName(), new Foo("bar")); // good - converter sets up type
this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
return new Message("some bad json".getBytes(), m.getMessageProperties()); // fail bad json
});
Message message = MessageBuilder
.withBody("{\"foo\":\"bar\"}".getBytes())
.andProperties(
MessagePropertiesBuilder
.newInstance()
.setContentType("application/json")
.build())
.build();
this.template.send(queue().getName(), message); // Success - default Foo class when no header
message.getMessageProperties().setHeader("__TypeId__", "foo");
this.template.send(queue().getName(), message); // Success - foo is mapped to Foo
message.getMessageProperties().setHeader("__TypeId__", "bar");
this.template.send(queue().getName(), message); // fail - mapped to a Map
Thread.sleep(5000);
}

@RabbitListener(queues = "So42215050")
public void handle(Foo in) {
logger.info("Received: " + in);
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonConverter());
factory.setErrorHandler(errorHandler());
return factory;
}

@Bean
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
}

@Bean
public Queue queue() {
return new Queue("So42215050", false, false, true);
}

@Bean
public MessageConverter jsonConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
DefaultClassMapper mapper = new DefaultClassMapper();
mapper.setDefaultType(Foo.class);
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("foo", Foo.class);
mappings.put("bar", Object.class);
mapper.setIdClassMapping(mappings);
converter.setClassMapper(mapper);
return converter;
}

public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

@Override
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException) {
ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
logger.error("Failed to process inbound message from queue "
+ lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
+ "; failed message: " + lefe.getFailedMessage(), t);
}
return super.isFatal(t);
}

}

public static class Foo {

private String foo;

public Foo() {
super();
}

public Foo(String foo) {
this.foo = foo;
}

public String getFoo() {
return this.foo;
}

public void setFoo(String foo) {
this.foo = foo;
}

@Override
public String toString() {
return "Foo [foo=" + this.foo + "]";
}

}

public static class Bar {

private String foo;

public Bar() {
super();
}

public Bar(String foo) {
this.foo = foo;
}

public String getFoo() {
return this.foo;
}

public void setFoo(String foo) {
this.foo = foo;
}

@Override
public String toString() {
return "Bar [foo=" + this.foo + "]";
}

}

}

关于Spring-boot-starter RabbitMQ 全局错误处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42215050/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com