gpt4 book ai didi

java - 如何避免 SimpleMessageListenerContainer 在发生意外错误时关闭?

转载 作者:行者123 更新时间:2023-12-01 22:34:27 28 4
gpt4 key购买 nike

我正在使用 Java boot 1.4.0 和“spring-boot-starter-amqp”来连接到rabbitMq。消息生产者、消费者和rabbitMq服务器都在我的控制之下。在生产环境中的几个月里一切都运行良好。但突然我的消费者停止了,下面给出了异常(exception)情况。由于我只生成始终有效的消息,因此我不知道出了什么问题。

但这导致我的监听器容器关闭。因此我的消息处理停止了。我必须手动重新启动我的消息使用者程序。

所以我的问题是:

  1. 我们能否避免在任何意外情况下完全关闭监听器容器?
  2. 是否可以优雅地丢弃此类消息并使监听器容器保持 Activity 状态?
  3. 如果没有,那么有什么方法可以检查我的所有监听器容器是否正在运行,并在发现它们死亡时启动它们? (注意:我查看了 RabbitListenerEndpointRegistry.getListenerContainers()。但看起来它没有涵盖 SimpleMessageListenerContainer 容器。)

异常日志:

2017-02-20 12:42:18.441 ERROR 18014 --- [writeToDBQQueueListenerContainer-17] o.s.a.r.l.SimpleMessageListenerContainer : Consumer thread error, thread abort.
java.lang.NoClassDefFoundError: org/springframework/messaging/handler/annotation/support/MethodArgumentNotValidException
at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy.causeIsFatal(ConditionalRejectingErrorHandler.java:110) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy.isFatal(ConditionalRejectingErrorHandler.java:97) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:72) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:625) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:852) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:685) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
Caused by: java.lang.ClassNotFoundException: org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_91]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_91]
at org.springframework.boot.loader.LaunchedURLClassLoader.loadClass(LaunchedURLClassLoader.java:89) ~[KattaQueueManager-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_91]
... 11 common frames omitted

还有一个异常(exception):

2017-02-20 12:42:18.674 ERROR 18014 --- [imageQueueListenerContainer-53] o.s.a.r.l.SimpleMessageListenerContainer : Consumer thread error, thread abort.

java.lang.NoClassDefFoundError: com/rabbitmq/utility/Utility
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.checkShutdown(BlockingQueueConsumer.java:348) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:402) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1160) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
Caused by: java.lang.ClassNotFoundException: com.rabbitmq.utility.Utility
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_91]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_91]
at org.springframework.boot.loader.LaunchedURLClassLoader.loadClass(LaunchedURLClassLoader.java:89) ~[KattaQueueManager-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_91]
... 7 common frames omitted

2017-02-20 12:42:18.675 ERROR 18014 --- [imageQueueListenerContainer-53] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer

我的消费者示例代码:

@Bean
public MessageConverter jsonMessageConverter(){
//return new JsonMessageConverter();
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();

converter.setClassMapper(new ClassMapper() {
@Override
public Class<?> toClass(MessageProperties properties) {
return String.class;
}

@Override
public void fromClass(Class<?> clazz, MessageProperties properties) {
}
});

return converter;
}

@Bean
public ConnectionFactory connectionFactory()
{
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory(_rabbitmqHost, _rabbitmqPort);
return connectionFactory;
}

@Bean
public RabbitTemplate rabbitTemplate()
{
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}

@Bean
TopicExchange exchange()
{
return new TopicExchange("MyExchange");
}

@Bean
public Queue mainQueue()
{
return new Queue("MyMainQ");
}

@Bean
public Binding mainRouteBinding()
{
return BindingBuilder.bind(mainQueue()).to(exchange()).with("MyMainQ");
}

@Bean
SimpleMessageListenerContainer mainQueueListenerContainer(
ConnectionFactory connectionFactory,
@Qualifier("mainQueueListenerAdapter") MessageListenerAdapter listenerAdapter)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(mainQueue());

container.setMessageConverter(jsonMessageConverter());

container.setMessageListener(listenerAdapter);
container.setConcurrentConsumers(1);
return container;
}

@Bean
MessageListenerAdapter mainQueueListenerAdapter(MainConsumer receiver)
{
MessageListenerAdapter msgAdapter = new MessageListenerAdapter(receiver, "receiveMessage");

msgAdapter.setMessageConverter(jsonMessageConverter());

return msgAdapter;
}

@Bean
MainConsumer getMainConsumer()
{
return new MainConsumer();
}

//
//The receiving method in MainConsumer class looks as given below
public void receiveMessage(String message)
{
// My business logic goes here ...
}

最佳答案

几个月前我遇到了同样的问题,这对我有帮助。如果您的 Rabbit Java 客户端版本在 4.0.0 之前,您不会自动恢复连接,您需要进行设置,如下所示:

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);

// connection that will recover automatically
factory.setAutomaticRecoveryEnabled(true);

// attempt recovery every 10 seconds
factory.setNetworkRecoveryInterval(10000);

Connection conn = factory.newConnection();

尝试查看RabbitMQ的文档: Rabbit Client API

关于java - 如何避免 SimpleMessageListenerContainer 在发生意外错误时关闭?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42501328/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com