- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章Spring Boot系列教程之死信队列详解由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
前言 。
在说死信队列之前,我们先介绍下为什么需要用死信队列.
如果想直接了解死信对接,直接跳入下文的"死信队列"部分即可.
ack机制和requeue-rejected属性 。
我们还是基于上篇《spring boot系列——7步集成rabbitmq》的demo代码来说.
在项目springboot-demo我们看到application.yaml文件部分配置内容如下 。
1
2
3
4
5
6
7
8
9
10
|
...
listener:
type: simple
simple:
acknowledge-mode: auto
concurrency:
5
default
-requeue-rejected:
true
max-concurrency:
100
...
|
其中 。
acknowledge-mode 。
该配置项是用来表示消息确认方式,其有三种配置方式,分别是none、manual和auto.
none意味着没有任何的应答会被发送.
manual意味着监听者必须通过调用channel.basicack()来告知所有的消息.
auto意味着容器会自动应答,除非messagelistener抛出异常,这是默认配置方式.
default-requeue-rejected 。
该配置项是决定由于监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为true.
我一开始对于这个属性有个误解,我以为rejected是表示拒绝,所以将requeue-rejected连起来是拒绝重新放回队列,后来查了资料明白这个属性的功能才想起来rejected是个形容词,其表示的应该是被拒绝的消息 。
所以如果该属性配置为true表示会重新放回队列,如果配置为false表示不会放回队列.
下面我们看看acknowledge-mode参数和default-requeue-rejected参数使用不同的组合方式,rabbitmq是如何处理消息的.
代码依然使用springboot-demo中的rabbitapplicationtests发送消息,使用receiver类监听demo-queue队列的消息.
对于receiver类添加了一行代码,该代码模拟抛出异常 。
1
2
3
4
5
6
7
8
9
|
@component
public
class
receiver {
@rabbitlistener
(queues =
"demo_queue"
)
public
void
created(string message) {
system.out.println(
"orignal message: "
+ message);
int
i =
1
/
0
;
}
}
|
acknowledge-mode=none, default-requeue-rejected=false 。
该配置不会确认消息是否正常消费,所以在控制台没有抛出任何异常。通过在rabbitmq管理页面也没有看到重新放回队列的消息 。
acknowledge-mode=none, default-requeue-rejected=true 。
同样该配置不会确认消息是否正常消费,所以在控制台没有抛出任何异常。而且即使default-requeue-rejected配置为true因为没有确认所以也没有看到重新放回队列的消息 。
acknowledge-mode=manual, default-requeue-rejected=false 。
该配置需要手动确认消息是否正常消费,但是代码中并没有手动确认,个人理解是因为没有收到ack,所以消息又回到了队列中.
acknowledge-mode=manual, default-requeue-rejected=true 。
该配置需要手动确认消息是否正常消费,但是代码中并没有手动确认,所以消息被重新放入到队列中了,并且在控制台发现还抛出了异常(这块不是很清楚,default-requeue-rejected设置true和false带来的不同效果,有了解的麻烦下方留言指教).
acknowledge-mode=auto, default-requeue-rejected=false 。
该配置采用自动确认,从结果来看,是自动确认了.
从控制台打印的结果可以看出receiver方法执行了3次,分别是前面两条放回队列的消息以及这次发送的消息,所以3条消息都消费了.
同时因为default-requeue-rejected设置为false,所以即使消费抛出异常,也没有将消息放回队列.
acknowledge-mode=auto, default-requeue-rejected=true 。
该配置同样采用自动确认,从结果看出,没有抛出异常(这块也不是很理解),且因为default-requeue-rejected设置为true,所以消息重新回到队列.
综上罗列这么多情况只为说明有些情况下,如果消息消费出错,因为配置问题导致消息丢失了。这在很多情况下是要命的,比如用户支付的订单号,如果因为抛异常等原因直接丢失是很要命的.
所以,我们需要有一个确保机制,能够保证即使失败的消息也能保存下来,这时候死信队列就排上用场了.
死信队列 。
死信队列的整个设计思路是这样的 。
生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> dlx交换机 -->队列 --> 消费者 。
下面我们通过网上的一个简单的死信队列的实现看看如何使用死信队列.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
@bean
(
"deadletterexchange"
)
public
exchange deadletterexchange() {
return
exchangebuilder.directexchange(
"dl_exchange"
).durable(
true
).build();
}
@bean
(
"deadletterqueue"
)
public
queue deadletterqueue() {
map<string, object> args =
new
hashmap<>(
2
);
// x-dead-letter-exchange 声明 死信交换机
args.put(
"x-dead-letter-exchange"
,
"dl_exchange"
);
// x-dead-letter-routing-key 声明 死信路由键
args.put(
"x-dead-letter-routing-key"
,
"key_r"
);
return
queuebuilder.durable(
"dl_queue"
).witharguments(args).build();
}
@bean
(
"redirectqueue"
)
public
queue redirectqueue() {
return
queuebuilder.durable(
"redirect_queue"
).build();
}
/**
* 死信路由通过 dl_key 绑定键绑定到死信队列上.
*
* @return the binding
*/
@bean
public
binding deadletterbinding() {
return
new
binding(
"dl_queue"
, binding.destinationtype.queue,
"dl_exchange"
,
"dl_key"
,
null
);
}
/**
* 死信路由通过 key_r 绑定键绑定到死信队列上.
*
* @return the binding
*/
@bean
public
binding redirectbinding() {
return
new
binding(
"redirect_queue"
, binding.destinationtype.queue,
"dl_exchange"
,
"key_r"
,
null
);
}
|
注意 。
声明了一个direct模式的exchange.
声明了一个死信队列deadletterqueue,该队列配置了一些属性x-dead-letter-exchange表明死信交换机,x-dead-letter-routing-key表明死信路由键,因为是direct模式,所以需要设置这个路由键.
声明了一个替补队列redirectqueue,变成死信的消息最终就是存放在这个队列的.
声明绑定关系,分别是死信队列以及替补队列和交换机的绑定.
那么如何模拟生成一个死信消息呢,可以在发送到dl_queue的消息在10秒后失效,然后转发到替补队列中,代码实现如下 。
1
2
3
4
5
6
7
8
9
10
11
12
|
public
void
sendmsg(string content) {
correlationdata correlationid =
new
correlationdata(uuid.randomuuid().tostring());
messagepostprocessor messagepostprocessor = message -> {
messageproperties messageproperties = message.getmessageproperties();
// 设置编码
messageproperties.setcontentencoding(
"utf-8"
);
// 设置过期时间10*1000毫秒
messageproperties.setexpiration(
"5000"
);
return
message;
};
rabbittemplate.convertandsend(
"dl_exchange"
,
"dl_key"
, content, messagepostprocessor);
}
|
执行结果如下 。
消息首先进入dl_queue,5秒后失效,被转发到redirect_queue中.
总结 。
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我的支持.
原文链接:https://www.cnblogs.com/bigdataZJ/p/springboot-deadletter-queue.html 。
最后此篇关于Spring Boot系列教程之死信队列详解的文章就讲到这里了,如果你想了解更多关于Spring Boot系列教程之死信队列详解的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在创建一个死信 channel 错误处理程序,如下所示 errorHandler(deadLetterChannel("direct:myDLC").useOriginalMessage().ma
以下是我的 Camel 路线代码 .errorHandler(deadLetterChannel("jmstx:queue:ErrorHandler") .useOriginalMessage()
我正在尝试跨不同的集群系统进行分布式发布-订阅,但无论我尝试什么,它都不起作用。 我想做的就是创建一个简单的示例。 1)我创建一个主题,说“内容”。 2) 假设 jvm A 中的一个节点创建主题、订阅
我是一名优秀的程序员,十分优秀!