- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
您好,我正在尝试在 Spring 集成 XML 中实现死信交换,因此如果 BBB 队列在某些情况下失败,例如 ,则该情况是 AAA 交换 绑定(bind) BBB 队列 lister 抛出异常 我想将异常导航到死交换队列来存储消息,下面是代码
创建示例项目
main.java
package com.spring.rabbit.first.deadletter;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Main {
public static void main(String[] args) {
new ClassPathXmlApplicationContext("/applicationContext.xml");
}
}
xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- Spring configuration -->
<context:component-scan base-package="com.spring.rabbit.first" />
<context:mbean-export default-domain="com.spring.rabbit.first.deadletter" />
<!-- RabbitMQ common configuration -->
<rabbit:connection-factory id="connectionFactory"
username="guest" password="guest" port="5672" virtual-host="/" host="localhost" />
<!-- <rabbit:connection-factory id="connectionFactory"/> -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory" />
<!-- Queues -->
<rabbit:queue id="springQueue" name="spring.queue"
auto-delete="true" durable="false" />
<rabbit:listener-container
connection-factory="connectionFactory" advice-chain="retryAdvice">
<rabbit:listener queues="BBBqueue" ref="messageListener" />
</rabbit:listener-container>
<bean id="messageListener" class="com.spring.rabbit.first.deadletter.MessageHandler" />
<bean id="retryAdvice"
class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
<property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer" />
<property name="retryOperations" ref="retryTemplate" />
</bean>
<bean id="rejectAndDontRequeueRecoverer"
class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer" />
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="2000" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="30000" />
</bean>
</property>
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="3" />
</bean>
</property>
</bean>
<rabbit:topic-exchange name="AAAqueue">
<rabbit:bindings>
<rabbit:binding queue="BBBqueue" pattern="" />
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:queue name="BBBqueue">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="XXX.dead.letter"></entry>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Long" />
</rabbit:queue-arguments>
</rabbit:queue>
<!-- dead letter -->
<rabbit:topic-exchange name="XXX.dead.letter">
<rabbit:bindings>
<rabbit:binding queue="XXX.dead.letter.queue" pattern=""></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:queue name="XXX.dead.letter.queue"></rabbit:queue>
</beans>
消息处理程序
package com.spring.rabbit.first.deadletter;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class MessageHandler implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("Received message: " + message);
System.out.println("Text: " + new String(message.getBody()));
message = null;
if (message == null) {
throw new NullPointerException();
}
}
}
消息发送者
package com.spring.rabbit.first.deadletter;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.stereotype.Service;
@Service
@ManagedResource
public class MessageSender {
@Autowired
private AmqpTemplate template;
@ManagedOperation
public void send(String text) {
send("amq.fanout", "NDPAR.SPRING.JAVA", text);
}
@ManagedOperation
public void send(String exchange, String key, String text) {
template.convertAndSend(exchange, key, text);
}
}
输出:
23:57:33.753 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
23:57:33.777 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=0
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:06.952 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.backoff.ExponentialBackOffPolicy - Sleeping for 2000
23:58:08.953 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=1
23:58:08.953 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=1
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:12.888 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.backoff.ExponentialBackOffPolicy - Sleeping for 20000
23:58:39.016 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=2
23:58:39.016 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=2
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:42.391 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=3
23:58:42.391 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry failed last attempt: count=3
23:58:42.393 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] WARN org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer -
Retries exhausted for message (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue,
receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
但我仍然没有在死信队列中看到任何消息。是否缺少任何内容?请帮忙解决这个问题
最佳答案
我不确定您的XXX-channel
和适配器应该做什么,但是您需要将RejectAndDontRequeueRecoverer
添加到重试建议工厂bean(在messageRecoverer
属性)。
默认恢复仅记录重试已用完并丢弃消息。
编辑
这是一个自定义 MessageRecoverer
,它将失败的消息从队列 A
发布到名为 A.dlq
的队列 - 队列和绑定(bind)是根据需要自动声明。
/*
* Copyright 2014-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
public class AutoConfiguringRepublishMessageRecoverer implements MessageRecoverer {
public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
public static final String X_ORIGINAL_EXCHANGE = "x-original-exchange";
public static final String X_ORIGINAL_ROUTING_KEY = "x-original-routingKey";
private final Log logger = LogFactory.getLog(getClass());
private final RabbitTemplate errorTemplate;
private final RabbitAdmin admin;
private final String deadLetterExchangeName = "DLX";
private final DirectExchange deadletterExchange = new DirectExchange(this.deadLetterExchangeName);
private boolean initialized;
public AutoConfiguringRepublishMessageRecoverer(RabbitTemplate errorTemplate) {
this.errorTemplate = errorTemplate;
this.admin = new RabbitAdmin(errorTemplate.getConnectionFactory());
}
@Override
public void recover(Message message, Throwable cause) {
if (!this.initialized) {
initialize();
}
Map<String, Object> headers = message.getMessageProperties().getHeaders();
headers.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause));
headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
headers.put(X_ORIGINAL_EXCHANGE, message.getMessageProperties().getReceivedExchange());
headers.put(X_ORIGINAL_ROUTING_KEY, message.getMessageProperties().getReceivedRoutingKey());
String dlqName = message.getMessageProperties().getConsumerQueue() + ".dlq";
if (this.admin.getQueueProperties(dlqName) == null) {
bindDlq(dlqName);
}
this.errorTemplate.send(this.deadLetterExchangeName, dlqName, message);
if (this.logger.isWarnEnabled()) {
this.logger.warn("Republishing failed message to " + dlqName);
}
}
private void initialize() {
this.admin.declareExchange(this.deadletterExchange);
this.initialized = true;
}
private void bindDlq(String dlqName) {
Queue dlq = new Queue(dlqName);
this.admin.declareQueue(dlq);
this.admin.declareBinding(BindingBuilder.bind(dlq).to(this.deadletterExchange).with(dlqName));
}
private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
return stringWriter.getBuffer().toString();
}
}
关于java - Rabbitmq死信spring集成xml,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42245593/
Windows 集成 (NTLM) 身份验证和 Windows 集成 (Kerberos) 之间有什么区别? 如何在IIS6中实现这些 w.r.t. MSDN 最佳答案 Kerberos 和 NTLM
Keycloak是一个用 Java 编写的开源身份验证和身份管理解决方案。它提供了一个nodejs适配器,使用它我能够成功地与express集成。这是有效的路由文件: 'use strict'
这是我关于 Bamboo 的第二个问题 ( My First One )。阅读建议信息后我的理解是,我需要一个构建工具,例如 nAnt 或 MSbuild 来编写一个获取源代码并构建它的脚本(我正在开
可用于将第三方应用程序与 jira 4.3 集成的身份验证方案有哪些?显然,从客户那里获取用户名和密码听起来很荒谬。另外,我知道 oauth 身份验证仅适用于版本 5。请告诉我。谢谢。 附注。我不是在
我有一个使用 DDS 的旧版 C++ 应用程序用于异步通信/消息传递。我需要将此应用程序集成到使用 JMS 进行消息传递的 JavaEE 环境中。除了构建独立的 JMS/DDS 桥接模块之外,我还有其
我正在尝试使用 Whatsapp 发送测试消息,但收到此错误消息: "error":{"code":27,"description":"Recipient not available on chann
我想将 photologue 与我的 Django 应用程序集成,并使用它在车辆库存中显示照片......有点像 Boost Motor Group Inc. 提供的内容。我已经集成了该应用程序,所以
我目前正在尝试弄清楚如何与 fujitsu scansnap 扫描仪集成,但没有从 fujitsu 找到有关 fujitsu scansnap 管理器如何调用您的应用程序并将文件发送到您的应用程序的详
在我的项目中,我使用了 9 个(九个)int-ip:udp-inbound-channel-adapter 和一个 jms:inbound-channel-adapter。 Jms 适配器从服务器接收
在我们当前的原型(prototype)中,大多数标准 HTML 控件都被小程序取代,最重要的是表单提交由小程序触发。 有没有一种方法可以像 一样在服务器端调用关联的操作 ? 本文Applet and
是否可以使用 twilio 号码从 whatsapp 发送/接收短信?有人用whatsapp试过twilio吗?我问过客服,如果可能的话,他说,不确定,但很多人都问过这个问题。 最佳答案 万一其他人来
我们办公室中几乎不存在版本控制,这显然导致了很多麻烦。我们想使用SVN和Notepad++进行设置...任何人都对如何实现此目标有任何想法?我已经开始研究并浏览了这个网站: http://www.sw
曾经有提供这种集成的 spring-modules 项目;但是,该项目现已弃用。现在有没有人继续支持这种集成?谢谢。 最佳答案 工作正在进行中。 http://blog.athico.com/sear
我的理解是,根据 http://wiki.dbpedia.org/Datasets,DBpedia 从 YAGO 获取类层次结构,而不是实体。 .但是,类似 http://dbpedia.org/cl
任何人都可以帮助我如何将 OpenCMS 与 Java Spring Web 应用程序集成。已经用谷歌搜索并浏览了很多网站但没有用。所以,请帮助我。 最佳答案 我认为将 SpringMVC 与 Ope
我正在尝试使用新的 migs getaway (MPGS) 我遵循了下一个 url 中的代码 https://ap-gateway.mastercard.com/api/documentation/i
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 4年前关闭。 Improve thi
我有一个 cmake 项目。我想轻松完成以下操作 搜索光标下任何变量、函数等的声明、定义和引用,这些可能在外部头文件中声明,其路径是在CMakeLists.txt中使用INCLUDE_DIRECTOR
有人能给我指点一下 Objective-C(或 c/c++)库的方向,或者教通过 FTP 上传或下载的教程(Objective-C)吗?最好能展示如何将文件下载到临时目录,然后稍后上传?我不介意针对
集成()给出了非常错误的答案: integrate(function (x) dnorm(x, -5, 0.07), -Inf, Inf, subdivisions = 10000L) # 2.127
我是一名优秀的程序员,十分优秀!