gpt4 book ai didi

java - Rabbitmq死信spring集成xml

转载 作者:行者123 更新时间:2023-12-01 08:53:09 26 4
gpt4 key购买 nike

您好,我正在尝试在 Spring 集成 XML 中实现死信交换,因此如果 BBB 队列在某些情况下失败,例如 ,则该情况是 AAA 交换 绑定(bind) BBB 队列 lister 抛出异常 我想将异常导航到死交换队列来存储消息,下面是代码

创建示例项目

main.java

package com.spring.rabbit.first.deadletter;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Main {

public static void main(String[] args) {
new ClassPathXmlApplicationContext("/applicationContext.xml");
}
}

xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

<!-- Spring configuration -->

<context:component-scan base-package="com.spring.rabbit.first" />
<context:mbean-export default-domain="com.spring.rabbit.first.deadletter" />

<!-- RabbitMQ common configuration -->

<rabbit:connection-factory id="connectionFactory"
username="guest" password="guest" port="5672" virtual-host="/" host="localhost" />


<!-- <rabbit:connection-factory id="connectionFactory"/> -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory" />

<!-- Queues -->

<rabbit:queue id="springQueue" name="spring.queue"
auto-delete="true" durable="false" />

<rabbit:listener-container
connection-factory="connectionFactory" advice-chain="retryAdvice">
<rabbit:listener queues="BBBqueue" ref="messageListener" />
</rabbit:listener-container>

<bean id="messageListener" class="com.spring.rabbit.first.deadletter.MessageHandler" />



<bean id="retryAdvice"
class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
<property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer" />
<property name="retryOperations" ref="retryTemplate" />
</bean>

<bean id="rejectAndDontRequeueRecoverer"
class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer" />


<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="2000" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="30000" />
</bean>
</property>
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="3" />
</bean>
</property>
</bean>



<rabbit:topic-exchange name="AAAqueue">
<rabbit:bindings>
<rabbit:binding queue="BBBqueue" pattern="" />
</rabbit:bindings>
</rabbit:topic-exchange>


<rabbit:queue name="BBBqueue">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="XXX.dead.letter"></entry>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Long" />
</rabbit:queue-arguments>
</rabbit:queue>


<!-- dead letter -->

<rabbit:topic-exchange name="XXX.dead.letter">
<rabbit:bindings>
<rabbit:binding queue="XXX.dead.letter.queue" pattern=""></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

<rabbit:queue name="XXX.dead.letter.queue"></rabbit:queue>



</beans>

消息处理程序

package com.spring.rabbit.first.deadletter;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class MessageHandler implements MessageListener {

@Override
public void onMessage(Message message) {

System.out.println("Received message: " + message);
System.out.println("Text: " + new String(message.getBody()));

message = null;
if (message == null) {
throw new NullPointerException();
}
}
}

消息发送者

package com.spring.rabbit.first.deadletter;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.stereotype.Service;

@Service
@ManagedResource
public class MessageSender {

@Autowired
private AmqpTemplate template;

@ManagedOperation
public void send(String text) {
send("amq.fanout", "NDPAR.SPRING.JAVA", text);
}

@ManagedOperation
public void send(String exchange, String key, String text) {
template.convertAndSend(exchange, key, text);
}
}

输出:

23:57:33.753 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
23:57:33.777 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=0
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:06.952 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.backoff.ExponentialBackOffPolicy - Sleeping for 2000
23:58:08.953 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=1
23:58:08.953 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=1
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:12.888 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.backoff.ExponentialBackOffPolicy - Sleeping for 20000
23:58:39.016 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=2
23:58:39.016 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=2
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:42.391 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=3
23:58:42.391 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry failed last attempt: count=3
23:58:42.393 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] WARN org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer -
Retries exhausted for message (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue,
receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

但我仍然没有在死信队列中看到任何消息。是否缺少任何内容?请帮忙解决这个问题

最佳答案

我不确定您的XXX-channel和适配器应该做什么,但是您需要将RejectAndDontRequeueRecoverer添加到重试建议工厂bean(在messageRecoverer属性)。

默认恢复仅记录重试已用完并丢弃消息。

编辑

这是一个自定义 MessageRecoverer,它将失败的消息从队列 A 发布到名为 A.dlq 的队列 - 队列和绑定(bind)是根据需要自动声明。

/*
* Copyright 2014-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;

public class AutoConfiguringRepublishMessageRecoverer implements MessageRecoverer {

public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";

public static final String X_EXCEPTION_MESSAGE = "x-exception-message";

public static final String X_ORIGINAL_EXCHANGE = "x-original-exchange";

public static final String X_ORIGINAL_ROUTING_KEY = "x-original-routingKey";

private final Log logger = LogFactory.getLog(getClass());

private final RabbitTemplate errorTemplate;

private final RabbitAdmin admin;

private final String deadLetterExchangeName = "DLX";

private final DirectExchange deadletterExchange = new DirectExchange(this.deadLetterExchangeName);

private boolean initialized;

public AutoConfiguringRepublishMessageRecoverer(RabbitTemplate errorTemplate) {
this.errorTemplate = errorTemplate;
this.admin = new RabbitAdmin(errorTemplate.getConnectionFactory());
}

@Override
public void recover(Message message, Throwable cause) {
if (!this.initialized) {
initialize();
}
Map<String, Object> headers = message.getMessageProperties().getHeaders();
headers.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause));
headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
headers.put(X_ORIGINAL_EXCHANGE, message.getMessageProperties().getReceivedExchange());
headers.put(X_ORIGINAL_ROUTING_KEY, message.getMessageProperties().getReceivedRoutingKey());

String dlqName = message.getMessageProperties().getConsumerQueue() + ".dlq";
if (this.admin.getQueueProperties(dlqName) == null) {
bindDlq(dlqName);
}
this.errorTemplate.send(this.deadLetterExchangeName, dlqName, message);
if (this.logger.isWarnEnabled()) {
this.logger.warn("Republishing failed message to " + dlqName);
}
}

private void initialize() {
this.admin.declareExchange(this.deadletterExchange);
this.initialized = true;
}

private void bindDlq(String dlqName) {
Queue dlq = new Queue(dlqName);
this.admin.declareQueue(dlq);
this.admin.declareBinding(BindingBuilder.bind(dlq).to(this.deadletterExchange).with(dlqName));
}

private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
return stringWriter.getBuffer().toString();
}

}

关于java - Rabbitmq死信spring集成xml,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42245593/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com