- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章Spring Boot与RabbitMQ结合实现延迟队列的示例由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
背景 。
何为延迟队列?
顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费.
场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列.
场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备.
延迟队列能做什么?
延迟队列多用于需要延迟工作的场景。最常见的是以下两种场景:
1、延迟消费。比如:
2、延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试.
如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。这种方案既不优雅,也不方便做成统一的服务便于开发人员使用。但是使用延迟队列的话,我们就可以轻而易举地完成.
如何实现?
别急,在下文中,我们将详细介绍如何利用spring boot加rabbitmq来实现延迟队列.
本文出现的示例代码都已push到github仓库中:https://github.com/lovelcp/blog-demos/tree/master/spring-boot-rabbitmq-delay-queue 。
实现思路 。
在介绍具体的实现思路之前,我们先来介绍一下rabbitmq的两个特性,一个是time-to-live extensions,另一个是dead letter exchanges.
time-to-live extensions 。
rabbitmq允许我们为消息或者队列设置ttl(time to live),也就是过期时间。ttl表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了ttl或者当某条消息进入了设置了ttl的队列时,这条消息会在经过ttl秒后“死亡”,成为dead letter。如果既配置了消息的ttl,又配置了队列的ttl,那么较小的那个值会被取用。更多资料请查阅官方文档.
dead letter exchange 。
刚才提到了,被设置了ttl的消息在过期后会成为dead letter。其实在rabbitmq中,一共有三种消息的“死亡”形式:
如果队列设置了dead letter exchange(dlx),那么这些dead letter就会被重新publish到dead letter exchange,通过dead letter exchange路由到其他队列。更多资料请查阅官方文档.
流程图 。
聪明的你肯定已经想到了,如何将rabbitmq的ttl和dlx特性结合在一起,实现一个延迟队列.
针对于上述的延迟队列的两个场景,我们分别有以下两种流程图:
延迟消费 。
延迟消费是延迟队列最为常用的使用模式。如下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过rabbitmq提供的ttl扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的dlx转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果.
。
延迟重试 。
延迟重试本质上也是延迟消费的一种,但是这种模式的结构与普通的延迟消费的流程图较为不同,所以单独拎出来介绍.
如下图所示,消费者发现该消息处理出现了异常,比如是因为网络波动引起的异常。那么如果不等待一段时间,直接就重试的话,很可能会导致在这期间内一直无法成功,造成一定的资源浪费。那么我们可以将其先放在缓冲队列中(图中红色队列),等消息经过一段的延迟时间后再次进入实际消费队列中(图中蓝色队列),此时由于已经过了“较长”的时间了,异常的一些波动通常已经恢复,这些消息可以被正常地消费.
。
代码实现 。
接下来我们将介绍如何在spring boot中实现基于rabbitmq的延迟队列。我们假设读者已经拥有了spring boot与rabbitmq的基本知识.
初始化工程 。
首先我们在intellij中创建一个spring boot工程,并且添加spring-boot-starter-amqp扩展.
配置队列 。
从上述的流程图中我们可以看到,一个延迟队列的实现,需要一个缓冲队列以及一个实际的消费队列。又由于在rabbitmq中,我们拥有两种消息过期的配置方式,所以在代码中,我们一共配置了三条队列:
我们通过java config的方式将上述的队列配置为bean。由于我们添加了spring-boot-starter-amqp扩展,spring boot在启动时会根据我们的配置自动创建这些队列。为了方便接下来的测试,我们将delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的dlx配置为同一个,且过期的消息都会通过dlx转发到delay_process_queue.
delay_queue_per_message_ttl 。
首先介绍delay_queue_per_message_ttl的配置代码:
1
2
3
4
5
6
7
|
@bean
queue delayqueuepermessagettl() {
return
queuebuilder.durable(delay_queue_per_message_ttl_name)
.withargument(
"x-dead-letter-exchange"
, delay_exchange_name)
// dlx,dead letter发送到的exchange
.withargument(
"x-dead-letter-routing-key"
, delay_process_queue_name)
// dead letter携带的routing key
.build();
}
|
其中,x-dead-letter-exchange声明了队列里的死信转发到的dlx名称,x-dead-letter-routing-key声明了这些死信在转发时携带的routing-key名称.
delay_queue_per_queue_ttl 。
类似地,delay_queue_per_queue_ttl的配置代码:
1
2
3
4
5
6
7
8
|
@bean
queue delayqueueperqueuettl() {
return
queuebuilder.durable(delay_queue_per_queue_ttl_name)
.withargument(
"x-dead-letter-exchange"
, delay_exchange_name)
// dlx
.withargument(
"x-dead-letter-routing-key"
, delay_process_queue_name)
// dead letter携带的routing key
.withargument(
"x-message-ttl"
, queue_expiration)
// 设置队列的过期时间
.build();
}
|
delay_queue_per_queue_ttl队列的配置比delay_queue_per_message_ttl队列的配置多了一个x-message-ttl,该配置用来设置队列的过期时间.
delay_process_queue 。
delay_process_queue的配置最为简单:
1
2
3
4
5
|
@bean
queue delayprocessqueue() {
return
queuebuilder.durable(delay_process_queue_name)
.build();
}
|
配置exchange 。
配置dlx 。
首先,我们需要配置dlx,代码如下:
1
2
3
4
|
@bean
directexchange delayexchange() {
return
new
directexchange(delay_exchange_name);
}
|
然后再将该dlx绑定到实际消费队列即delay_process_queue上。这样所有的死信都会通过dlx被转发到delay_process_queue:
1
2
3
4
5
6
|
@bean
binding dlxbinding(queue delayprocessqueue, directexchange delayexchange) {
return
bindingbuilder.bind(delayprocessqueue)
.to(delayexchange)
.with(delay_process_queue_name);
}
|
配置延迟重试所需的exchange 。
从延迟重试的流程图中我们可以看到,消息处理失败之后,我们需要将消息转发到缓冲队列,所以缓冲队列也需要绑定一个exchange。在本例中,我们将delay_process_per_queue_ttl作为延迟重试里的缓冲队列。具体代码是如何配置的,这里就不赘述了,大家可以查阅我github中的代码.
定义消费者 。
我们创建一个最简单的消费者processreceiver,这个消费者监听delay_process_queue队列,对于接受到的消息,他会:
另外,我们还需要新建一个监听容器用于存放消费者,代码如下:
1
2
3
4
5
6
7
8
|
@bean
simplemessagelistenercontainer processcontainer(connectionfactory connectionfactory, processreceiver processreceiver) {
simplemessagelistenercontainer container =
new
simplemessagelistenercontainer();
container.setconnectionfactory(connectionfactory);
container.setqueuenames(delay_process_queue_name);
// 监听delay_process_queue
container.setmessagelistener(
new
messagelisteneradapter(processreceiver));
return
container;
}
|
至此,我们前置的配置代码已经全部编写完成,接下来我们需要编写测试用例来测试我们的延迟队列.
编写测试用例 。
延迟消费场景 。
首先我们编写用于测试ttl设置在消息上的测试代码.
我们借助spring-rabbit包下提供的rabbittemplate类来发送消息。由于我们添加了spring-boot-starter-amqp扩展,spring boot会在初始化时自动地将rabbittemplate当成bean加载到容器中.
解决了消息的发送问题,那么又该如何为每个消息设置ttl呢?这里我们需要借助messagepostprocessor.
messagepostprocessor通常用来设置消息的header以及消息的属性。我们新建一个expirationmessagepostprocessor类来负责设置消息的ttl属性: 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
/**
* 设置消息的失效时间
*/
public
class
expirationmessagepostprocessor
implements
messagepostprocessor {
private
final
long
ttl;
// 毫秒
public
expirationmessagepostprocessor(
long
ttl) {
this
.ttl = ttl;
}
@override
public
message postprocessmessage(message message)
throws
amqpexception {
message.getmessageproperties()
.setexpiration(ttl.tostring());
// 设置per-message的失效时间
return
message;
}
}
|
然后在调用rabbittemplate的convertandsend方法时,传入expirationmessagepostporcessor即可。我们向缓冲队列中发送3条消息,过期时间依次为1秒,2秒和3秒。具体的代码如下所示:
1
2
3
4
5
6
7
8
9
10
|
@test
public
void
testdelayqueuepermessagettl()
throws
interruptedexception {
processreceiver.latch =
new
countdownlatch(
3
);
for
(
int
i =
1
; i <=
3
; i++) {
long
expiration = i *
1000
;
rabbittemplate.convertandsend(queueconfig.delay_queue_per_message_ttl_name,
(object) (
"message from delay_queue_per_message_ttl with expiration "
+ expiration),
new
expirationmessagepostprocessor(expiration));
}
processreceiver.latch.await();
}
|
细心的朋友一定会问,为什么要在代码中加一个countdownlatch呢?这是因为如果没有latch阻塞住测试方法的话,测试用例会直接结束,程序退出,我们就看不到消息被延迟消费的表现了.
那么类似地,测试ttl设置在队列上的代码如下:
1
2
3
4
5
6
7
8
9
|
@test
public
void
testdelayqueueperqueuettl()
throws
interruptedexception {
processreceiver.latch =
new
countdownlatch(
3
);
for
(
int
i =
1
; i <=
3
; i++) {
rabbittemplate.convertandsend(queueconfig.delay_queue_per_queue_ttl_name,
"message from delay_queue_per_queue_ttl with expiration "
+ queueconfig.queue_expiration);
}
processreceiver.latch.await();
}
|
我们向缓冲队列中发送3条消息。理论上这3条消息会在4秒后同时过期.
延迟重试场景 。
我们同样还需测试延迟重试场景.
1
2
3
4
5
6
7
8
|
@test
public
void
testfailmessage()
throws
interruptedexception {
processreceiver.latch =
new
countdownlatch(
6
);
for
(
int
i =
1
; i <=
3
; i++) {
rabbittemplate.convertandsend(queueconfig.delay_process_queue_name, processreceiver.fail_message);
}
processreceiver.latch.await();
}
|
我们向delay_process_queue发送3条会触发fail的消息,理论上这3条消息会在4秒后自动重试.
查看测试结果 。
延迟消费场景 。
延迟消费的场景测试我们分为了ttl设置在消息上和ttl设置在队列上两种。首先,我们先看一下ttl设置在消息上的测试结果:
从上图中我们可以看到,processreceiver分别经过1秒、2秒、3秒收到消息。测试结果表明消息不仅被延迟消费了,而且每条消息的延迟时间是可以被个性化设置的。ttl设置在消息上的延迟消费场景测试成功.
然后,ttl设置在队列上的测试结果如下图:
从上图中我们可以看到,processreceiver经过了4秒的延迟之后,同时收到了3条消息。测试结果表明消息不仅被延迟消费了,同时也证明了当ttl设置在队列上的时候,消息的过期时间是固定的。ttl设置在队列上的延迟消费场景测试成功.
延迟重试场景 。
接下来,我们再来看一下延迟重试的测试结果:
processreceiver首先收到了3条会触发fail的消息,然后将其移动到缓冲队列之后,过了4秒,又收到了刚才的那3条消息。延迟重试场景测试成功.
总结 。
本文首先介绍了延迟队列的概念以及用途,并且通过代码详细讲解了如何通过spring boot和rabbitmq实现一个延迟队列。希望本文能够对大家平时的学习和工作能有所启发和帮助。也希望大家多多支持我.
原文链接:http://www.kissyu.org/ 。
最后此篇关于Spring Boot与RabbitMQ结合实现延迟队列的示例的文章就讲到这里了,如果你想了解更多关于Spring Boot与RabbitMQ结合实现延迟队列的示例的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
有人可以解释一下 spring-boot-parent 和 spring-boot-starter-parent 之间的区别吗,正如我在下面附加的 GIT HUB 代码链接之一中看到的,他们为 spr
我有与 jersey 框架集成的 Spring Boot 应用程序。 现在,当我尝试运行该应用程序时,它只是停留在 Spring 启动徽标上,之后没有任何 react 。 我也尝试添加 -X ,但徽标
我指的是 Spring Boot 关于 的文档自动配置 和 执行器 模块: 自动配置: Spring Boot AutoConfiguration attempts to automatically
我正在尝试将 apache log4j 集成到我的 Spring boot 应用程序中。这是我的 build.gradle 文件: build.gradle buildscript { rep
使用 Spring Boot Maven 插件的以下命令在生产中启动 Spring Boot 应用程序是否是一个好主意或实践? mvn spring-boot:run 最佳答案 不,这是个坏主意。 您
据我所知,spring boot 和 spring session 为我们提供了一站式自动配置,但是当我的应用程序使用 session redis 和应用程序缓存 redis 时,不是同一个 redi
我希望使用Spring Boot创建一个新的Web应用程序。不幸的是,我的服务器在技术堆栈方面相当有限。它安装了Java 5。 谁能告诉我spring boot是否可以在Java 1.5上运行以及什么
我有3个实体 CarWash(设置Wash) Wash(car_wash_id FK到CarWash) WashComment(wash_id FK到Wash) 有什么办法可以写这个查询 @Qu
我一直在关注this文章。 我正在尝试在Spring-boot应用程序中优雅地处理gRPC错误,的主要目标是能够在gRPC客户端中获取错误状态。 在上面的文章之后,我坚持为异常添加拦截器。如何在Spr
我有一个要使用的自定义log4j布局插件。在IntelliJ中运行或与./gradlew bootRun一起运行时,插件可以正常工作。不使用./gradlew bootJar构建启动jar。 启用-D
我想在给定范围 (5001-5100) 的随机端口上启动 Spring Cloud 应用程序(Spring Boot 1.5.14,Spring Cloud Edgware.SR4)。我知道我们可以使
任何人都可以向我展示或指出不使用 spring boot gradle 插件的 spring boot gradle 项目。 我正在寻找类似不使用 gradle 插件的 spring boot sta
我当时尝试包含上述依赖项之一,但找不到任何区别: spring boot starter web:我可以看到 Flux 和 Mono 类并制作一个响应式(Reactive)休息 Controller
我们一直在为我们的应用程序使用 Springboot 1.X。 现在准备开始一些新的应用程序,想知道我们是应该使用 SpringBoot2.0 还是坚持使用 SpringBoot 1.X? 对一种方式
我希望记录应用程序正在加载 application-profile.propeties 或 application.yml。怎么做。在哪种方法中,我可以听取它并检测它是成功加载还是失败。 最佳答案 您
当我在 pom.xml 中添加简单的 spring-boot-starter-data-jpa 依赖项时,在 pom.xml 文件中出现错误。如果我删除该依赖项,则不会再有错误。我不确定为什么会发生这
我希望记录应用程序正在加载 application-profile.propeties 或 application.yml。怎么做。在哪种方法中,我可以听取它并检测它是成功加载还是失败。 最佳答案 您
我在网上看了很多关于 spring-boot-devtools 的文章和问题,但仍然无法弄清楚为什么它对我不起作用。每次运行我的应用程序时,我都会得到以下信息: 17:54:28.057 [main]
我正在尝试将现有的 Spring 应用程序移植到 Spring Boot。我不使用 spring-boot-starter-data-solr 启动器,但是我的类路径上有 apache solrj (
(这主要是一个历史问题。Pivotal 建议所有论坛讨论都在 StackOverflow 上进行,这就是我在这里问它的原因。) Spring Boot 项目用来证明将应用程序的类和依赖项从可执行 ja
我是一名优秀的程序员,十分优秀!