- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在从事一个 Spring 项目,并且正在尝试为 RabbitMQ 队列实现带有死信的指数退避。
在此过程中,我创建了一个死信队列和一个死信交换 (Fanout),并将原始队列的 x-dead-letter-exchange 参数设置为死信交换的名称,并创建了一个带有 ExponentialBackOffPolicy 的 RetryTemplate .
出于测试目的,我的消费者只是通过抛出异常来拒绝它收到的所有消息。
这是我的 RabbitMQConfiguration 类的样子:
@Configuration
@EnableAutoConfiguration
@PropertySource("file:${HOME}/common/config/wave-planning.properties")
public class RabbitMQConfiguration {
private final static String QUEUE_NAME = "orderPlanQueue";
private static final String EXCHANGE_NAME = "orderPlanExchange";
private static final String DL_EXCHANGE_NAME = "deadLetterExchange";
private static final String DL_QUEUE_NAME = "deadLetterQueue";
@Value("${rabbitmq.host:localhost}")
private String host;
@Value("${rabbitmq.port:5672}")
private int port;
@Value("${rabbitmq.user:guest}")
private String userName;
@Value("${rabbitmq.password:guest}")
private String password;
@Value("${rabbitmq.initial_backoff_interval:1000}")
private int INITIAL_INTERVAL_IN_MILLISECONDS;
@Value("${rabbitmq.max_backoff_interval:10000}")
private int MAX_INTERVAL_IN_MILLISECONDS;
@Autowired
OrderPlanService orderPlanService;
@Bean
Queue queue() {
Map<String, Object> qargs = new HashMap<String, Object>();
qargs.put("x-dead-letter-exchange", DL_EXCHANGE_NAME);
return new Queue(QUEUE_NAME, false, false, false, qargs);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(EXCHANGE_NAME);
}
@Bean
FanoutExchange deadLetterExchange() { return new FanoutExchange(DL_EXCHANGE_NAME); }
@Bean
Queue deadLetterQueue() { return new Queue(DL_QUEUE_NAME); }
@Bean
Binding deadLetterBinding(Queue deadLetterQueue, FanoutExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
public MessageConverter Jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retry = new RetryTemplate();
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(INITIAL_INTERVAL_IN_MILLISECONDS);
policy.setMultiplier(2);
policy.setMaxInterval(MAX_INTERVAL_IN_MILLISECONDS);
retry.setBackOffPolicy(policy);
template.setRetryTemplate(retry);
template.setRoutingKey(QUEUE_NAME);
template.setMessageConverter(Jackson2JsonMessageConverter());
return template;
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setMessageConverter(Jackson2JsonMessageConverter());
container.setQueueNames(QUEUE_NAME);
container.setMessageListener(listenerAdapter);
container.setDefaultRequeueRejected(false);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter() {
return new MessageListenerAdapter(orderPlanService, "consume");
}
}
消费者的相关部分基本上是这样的:
@Service
@Transactional
public class BaseOrderPlanService implements OrderPlanService {
....
@Override
public void consume(Object object) {
throw new IllegalArgumentException("Test");
}
}
对于 Autowiring 的整数值,使用默认值。
在运行它时,我看到交换器和队列是按预期在 rabbitmq 上创建的,具有预期的绑定(bind)和相关参数。
但是,当我使用路由键“orderPlanQueue”将消息传递给 orderPlanExchange 时,它会导致无限循环,因为消息在队列中反复被拒绝和替换。
另一方面,如果 IllegalArgumentException 被替换为 AmqpRejectAndDontRequeueException,则在第一次拒绝尝试时,消息将被简单地扔进死信队列。
如果有人能指出我在这里可能做错了什么,即未应用重试策略,我将不胜感激。
编辑:根据 Artem 的建议使用 StatefulRetryOperationsInterceptor 进行编码。
@Configuration
@EnableAutoConfiguration
@PropertySource("file:${HOME}/common/config/wave-planning.properties")
public class RabbitMQConfiguration {
private final static String QUEUE_NAME = "orderPlanQueue";
private static final String EXCHANGE_NAME = "orderPlanExchange";
private static final String DL_EXCHANGE_NAME = "deadLetterExchange";
private static final String DL_QUEUE_NAME = "deadLetterQueue";
@Value("${rabbitmq.host:localhost}")
private String host;
@Value("${rabbitmq.port:5672}")
private int port;
@Value("${rabbitmq.user:guest}")
private String userName;
@Value("${rabbitmq.password:guest}")
private String password;
@Value("${rabbitmq.initial_backoff_interval:1000}")
private int INITIAL_INTERVAL_IN_MILLISECONDS;
@Value("${rabbitmq.max_backoff_interval:10000}")
private int MAX_INTERVAL_IN_MILLISECONDS;
@Autowired
OrderPlanService orderPlanService;
@Bean
Queue queue() {
Map<String, Object> qargs = new HashMap<String, Object>();
qargs.put("x-dead-letter-exchange", DL_EXCHANGE_NAME);
return new Queue(QUEUE_NAME, false, false, false, qargs);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(EXCHANGE_NAME);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
}
@Bean
FanoutExchange deadLetterExchange() { return new FanoutExchange(DL_EXCHANGE_NAME); }
@Bean
Queue deadLetterQueue() { return new Queue(DL_QUEUE_NAME); }
@Bean
Binding deadLetterBinding(Queue deadLetterQueue, FanoutExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
public MessageConverter Jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
/*
RetryTemplate retry = new RetryTemplate();
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(INITIAL_INTERVAL_IN_MILLISECONDS);
policy.setMultiplier(2);
policy.setMaxInterval(MAX_INTERVAL_IN_MILLISECONDS);
retry.setBackOffPolicy(policy);
template.setRetryTemplate(retry);
*/
template.setRoutingKey(QUEUE_NAME);
template.setMessageConverter(Jackson2JsonMessageConverter());
return template;
}
@Bean
StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(4)
.backOffOptions(INITIAL_INTERVAL_IN_MILLISECONDS, 2, MAX_INTERVAL_IN_MILLISECONDS)
.build();
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setMessageConverter(Jackson2JsonMessageConverter());
container.setQueueNames(QUEUE_NAME);
container.setMessageListener(listenerAdapter);
container.setAdviceChain(new Advice[] {interceptor()});
return container;
}
@Bean
MessageListenerAdapter listenerAdapter() {
return new MessageListenerAdapter(orderPlanService, "consume");
}
}
最佳答案
RabbitTemplate
的重试策略与 DLQ/DLX 完全无关。这是针对消费者的。
看引用手册的区别here :
you can now configure the
RabbitTemplate
to use aRetryTemplate
to help with handling problems with broker connectivity.
和here :
To put a limit in the client on the number of re-deliveries, one choice is a
StatefulRetryOperationsInterceptor
in the advice chain of the listener.
因此,您必须重新考虑您的逻辑并将重试功能添加到 SimpleMessageListenerContainer
定义中。
关于java - Spring + RabbitMQ Exponential Backoff with RetryTemplate 无响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45019585/
假设您要计算 5^65537而不是相乘 5 65537次,建议做((5^2)^16)*5 .这导致 16 次平方和 1 次乘法。 但我的问题是你不是通过对非常大的数字进行平方来补偿平方次数吗?当您进入
寻找一种使用 Esper (EPL) 语句计算 5 个 EMA5 和 EMA20 窗口内的指数移动平均线的方法。 我收到了价格事件流(时间戳、交易品种和价格),我在 5 的滑动窗口上编写了一个简单移动
我有一组值遵循 exponential distribution .现在,我想计算速率参数 alpha。谁能帮我计算一下(我正在使用 C++ 编写代码)? 最佳答案 如果您知道这些值来自指数分布,那么
我在两台不同的机器上安装了 Python3.6,发行版 Anaconda。我不能发誓我使用了相同的安装程序文件,尽管我认为我使用过。当我尝试检查 Python、Anaconda 和 numpy 版本时
我的互斥量实现只是在 CAS 失败时重试(伪代码): while(!compare_and_swap(&mutex, 0, 1)); 但它浪费了太多的 CPU 周期。 我了解到“指数退避”可以提高整体
背景信息 我正在设置一个函数,它根据开始日期和结束日期创建一个日期数组。 该函数将接收开始和结束日期,这些日期首先被格式化为 year-month-dayT12:00:00:00 格式,然后使用 .g
因此,我在这里编写了一个脚本,将学生添加到类(class)中(Google Classroom API)。 students = getStudents('Year10', '10A') # VAR
一个字符串一般有多少个子串? Why does string x [1:n] have O(n^2) subtrings according to the lecture 21 Dynamic Pro
当我减去两个值时,我得到 awk 结果,错误是我得到指数值 2.7755575615629E-17 而不是 0。我缺少应用的任何内容,请提出建议。仅在某些情况下才会发生这种情况,例如 0.66、0.6
我可能遗漏了一些明显的东西,但是如何计算 SAS 中的“幂”? 例如 X 的平方,还是 Y 的立方? 我需要的是变量1 ^ 变量2,但找不到语法...(我使用的是SAS 9.1.3) 最佳答案 明白了
当我减去两个值时,我得到了 awk 结果,错误是我得到的指数值是 2.7755575615629E-17 而不是 0。任何我遗漏的申请,请提出建议。这些情况仅在某些情况下发生,例如 0.66、0.67
我使用 Numpy 的 numpy.random.exponential 函数已有一段时间了。我现在看到 Python 的 random 模块有很多我不知道的功能。它有什么东西可以替代 numpy.r
我正在从事一个 Spring 项目,并且正在尝试为 RabbitMQ 队列实现带有死信的指数退避。 在此过程中,我创建了一个死信队列和一个死信交换 (Fanout),并将原始队列的 x-dead-le
我经常使用以下模式来创建对象 null/undefined省略属性: const whatever = { something: true, ...(a ? { a } : null),
关闭。这个问题需要details or clarity .它目前不接受答案。 想改进这个问题吗? 通过 editing this post 添加细节并澄清问题. 关闭 5 年前。 Improve t
我正在阅读 GCM:https://developers.google.com/cloud-messaging/server 其中一项要求是服务器需要能够: 处理请求并使用指数退避重新发送它们。 我的
我们正在使用 Beanstalk 上的工作层来发送 webhook。我们需要使用指数退避,以防在联系第三方时出现任何错误。但是,我不清楚这将如何工作。 如果作业失败并且我调用了 ChangeMes
我需要标记我的 y 轴,以便它首先显示单词“Power”,然后是方括号中的表达式:[micro Volt squared]。 我可以生成我想要的整个标签的单个部分,但是当我想组合它们时遇到了问题: x
如何使用范围(0-99)内的不同分布在 Java 中生成随机数。 我知道标准的 Java.util.Random getNextInt() 使用 Uniform 和 PRNG。我将如何使用 nextG
我知道当 RPC 调用失败时,指数退避是一件好事。到目前为止,在我的 GAE/P 应用程序中,我已经通过使用任务队列实现了指数退避: deferred.defer(function_that_make
我是一名优秀的程序员,十分优秀!