- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我有一个 SimpleMessageListenerContainer,正在与 RabbitMQ 和 Java 一起使用。在大多数情况下,我没有遇到任何问题,但是在某些情况下,当消息发送到队列时,似乎会出现异常,导致 SMLC 陷入循环,尝试关闭然后重新启动队列。
[10/03/15 17:09:38:161 UTC] 00000246 SimpleMessage W org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer run Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpIOException: java.io.IOException
[10/03/15 17:09:38:189 UTC] 00000246 SimpleMessage I org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer run Restarting Consumer: tag=[null], channel=Cached Rabbit Channel: AMQChannel(amqp://epa_devint1@xx.xx.xx.xx:5782/,1), acknowledgeMode=AUTO local queue size=0
[10/03/15 17:09:39:164 UTC] 00000256 SimpleMessage W org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer run Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: com.rabbitmq.client.ShutdownSignalException: connection error; reason: {#method(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0), null, ""}
当我通过管理界面从队列中删除消息时,就不再有异常了。
我认为异常的原因是缺少 header 属性。
处理此异常的正确方法是什么,以便从队列中删除消息并退出关闭/重新启动逻辑?
public SimpleMessageListenerContainer createMessageListenerContainer(Object consumer, String exchangeName, String queueName, String routingKey) {
TopicExchange exchange = new TopicExchange(exchangeName);
Queue queue = new Queue(queueName,
MessagingConstants.RABBIT_MQ_QUEUE_DURABLE,
MessagingConstants.RABBIT_MQ_QUEUE_EXCLUSIVE,
MessagingConstants.RABBIT_MQ_QUEUE_AUTO_DELETE,
MessagingConstants.RABBIT_MQ_QUEUE_ARGUMENTS);
amqpAdmin.declareExchange(exchange);
amqpAdmin.declareQueue(queue);
amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue);
container.setConcurrentConsumers(MessagingConstants.RABBIT_MQ_CONCURRENT_CONSUMERS);
container.setErrorHandler(errorHandler);
container.setMessageListener(new MessageListenerAdapter(consumer, null));
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setAdviceChain(retryAdviceChainFactory.createRequestRequeueExceptionAwareRetryChain(MessagingConstants.RABBIT_MQ_RETRY_ATTEMPTS));
container.setChannelTransacted(true);
container.setTaskExecutor(taskExecutor);
container.setTransactionManager(transactionManager);
return container;
}
@Override
public void afterPropertiesSet() {
com.rabbitmq.client.ConnectionFactory rabbitFactory = new com.rabbitmq.client.ConnectionFactory() {
protected void configureSocket(Socket socket) throws IOException {
super.configureSocket(socket);
socket.setSoTimeout(propertiesHolder.getRabbitMQSocketTimeoutMS());
}
};
rabbitFactory.setConnectionTimeout(propertiesHolder.getRabbitMQConnectionTimeoutMS());
rabbitFactory.setRequestedHeartbeat(propertiesHolder.getRabbitMQRequestedHeartbeatInSeconds());
CachingConnectionFactory cachingFactory = new CachingConnectionFactory(rabbitFactory);
cachingFactory.setAddresses(propertiesHolder.getRabbitMQHost());
cachingFactory.setPort(propertiesHolder.getRabbitMQPort());
cachingFactory.setUsername(propertiesHolder.getRabbitMQUsername());
cachingFactory.setPassword(propertiesHolder.getRabbitMQPassword());
cachingFactory.setChannelCacheSize(propertiesHolder.getRabbitMQChannelCacheSize());
connectionFactory = cachingFactory;
retryAdviceChainFactory = new RetryAdviceChainFactory();
amqpAdmin = new RabbitAdmin(connectionFactory);
errorHandler = new ErrorHandler() {
@Override
public void handleError(Throwable e) {
LOG.error("Error occurred", e);
}
};
TopicExchange deadLetterExchange = new TopicExchange(MessagingConstants.RABBIT_MQ_DEAD_LETTER_EXCHANGE);
Queue deadLetterQueue = new Queue(
MessagingConstants.RABBIT_MQ_DEAD_LETTER_QUEUE,
MessagingConstants.RABBIT_MQ_QUEUE_DURABLE,
MessagingConstants.RABBIT_MQ_QUEUE_EXCLUSIVE,
MessagingConstants.RABBIT_MQ_QUEUE_AUTO_DELETE,
MessagingConstants.RABBIT_MQ_QUEUE_ARGUMENTS);
amqpAdmin.declareExchange(deadLetterExchange);
amqpAdmin.declareQueue(deadLetterQueue);
amqpAdmin.declareBinding(BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("#"));
messageRecoverer = new DeadLetterMessageRecoverer(rabbitTemplate(), deadLetterExchange);
}
JDK:1.6spring-rabbit: 1.1.3.Releasespring-framework:3.2.1.发布
最佳答案
您的问题是由消息处理的事务性质引起的。我的意思是:
container.setTransactionManager(transactionManager);
发生的情况是,在某些情况下(可能是消息头的一些问题,正如您所建议的)您遇到一些错误,并且没有得到正确处理。错误传播并到达 txManager,txManager 依次:
现在,当您遇到的问题不是您的消息的直接后果时,上述行为绝对有意义。假设您在处理消息时遇到超时;应用程序在处理时关闭;在所有这些情况下,稍后重新传递消息或将消息传递到另一个节点确实有意义。
在您的情况下,无论何时,也无论哪个节点收到错误消息,您都会遇到同样的问题,并且需要手动删除该消息。为了避免这种情况,您可以:
以编程方式过滤掉无效消息,并且也不将它们返回到队列
根据您的要求使用 RabbitMQ 功能来处理错误
关于java - 如何阻止 SimpleMessageListenerContainer 陷入关闭/重新启动循环?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28972994/
我可以理解org.springframework.util.backoff存在于简单的用例中并用于核心 spring 模块。 (与使用 spring-retry 相比)。 但我的问题是,为什么spri
当某个事件到达时,我使用 container.stop(); 停止我的 rabbitmq listner,并在所需的工作完成后,使用 重新启动它container.start(),但是当新事件到达时,
我有一个传入数据流作为单独的消息发送到 RabbitMQ。 我想将这些发送到需要一批消息的服务。当我有一批 1000 条消息或 5 秒过期时,我需要将请求发送到服务。使用 SimpleMessageL
在任何可能的错误(缺少队列、连接问题等)上关闭 SimpleMessageListenerContainer(以编程方式创建,而不是作为 bean)并创建新的(在运行时重新声明所有绑定(bind))的
我尝试使用 spring boot 连接到 RabbitMQ。连接应始终重新启动/重试连接。我在发生致命异常后重新连接时遇到问题。应用程序永远不会断开连接,也不会无限期地重试连接。 @Bean
我正在尝试在 spring amqp 中实现恢复。我已经使用下面的代码来实现相同的 RetryOperationsInterceptor retryInterceptorBuilder =RetryI
我正在使用 SimpleMessageListenerContainer作为通过 AMQP 进行远程处理的基础。只要在进程启动时可以访问 RabbitMQ 代理,一切都会顺利进行。但是,如果由于任何原
我有 2 个正在收听的 RabbitMQ 队列。对于 1 个队列,我需要一个消费者来确保一次只处理一条消息。 第二个队列可以在一秒钟内接收大量消息,我需要能够一次使用很多消息。 我有以下代码,它确实按
我正在使用 Java boot 1.4.0 和“spring-boot-starter-amqp”来连接到rabbitMq。消息生产者、消费者和rabbitMq服务器都在我的控制之下。在生产环境中的几
有没有办法将 SimpleMessageListenerContainer 初始化为停止状态? 即我不想使用任何 JMS 消息,直到稍后用户执行启动 JMS 容器的操作 我尝试在创建 bean 后立即
从 this question 开始,我们有一个 Rabbit 凭证失效的场景,我们需要在我们的 CachingConnectionFactory 上调用 resetConnection() 来获取一
我试图更好地了解如何在我的应用程序中实现构造函数注入(inject)。我有一些由 SimpleMessageListenerContainer 工作人员执行的后台进程,这些进程从 AMQP 服务器中提
我在 springrabbitmq 消费者端不断收到以下警告。它一直在寻找重新启动。 [SimpleAsyncTaskExecutor-3317] WARN org.springframework.
我正在使用 Spring Cloud 库来轮询 SQS。如何设置轮询间隔? @Bean @Primary public AmazonSQSAsync amazonSQSAsync() { re
最近我们遇到了一种情况,消息被消费者从队列中取出,但没有到达与该队列绑定(bind)的监听器。我们之所以能够做出这个断言,是因为我们的监听器上有一个拦截器(打印日志消息),该拦截器不会为这些消息触发。
我有一个 SimpleMessageListenerContainer,正在与 RabbitMQ 和 Java 一起使用。在大多数情况下,我没有遇到任何问题,但是在某些情况下,当消息发送到队列时,似乎
我们一直在使用一个框架来使用Spring AMQP,其中框架设置了SimpleMessageListenerContainer.setDefaultRequeueRejected(false), 这意
RabbitMQ 提供了选择性设置预取计数的功能。 使用 spring-amqp 的 SimpleMessageListenerContainer,我注意到预取计数始终被设置。我无法将预取计数设置为
我正在使用 springs SimpleMessageListenerContainer 来消费来自 RabbitMQ 队列的消息。一切正常,但是当向队列发送无效消息(例如无效的 json)时,监听器
我目前正在学习一些有关 RabbitMQ + SpringAMQ 的知识,我试图使我的 SimpleMessageListenerContainer 能够读取传递到我的两个队列的消息,但只有一个正在接
我是一名优秀的程序员,十分优秀!