- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
大家好,我是三友~~ 。 前段时间有个小项目需要使用延迟任务,谈到延迟任务,我脑子第一时间一闪而过的就是使用消息队列来做,比如RabbitMQ的死信队列又或者RocketMQ的延迟队列,但是奈何这是一个小项目,并没有引入MQ,我也不太想因为一个延迟任务就引入MQ,增加系统复杂度,所以这个方案直接就被pass了. 虽然基于MQ这个方式走不通了,但是这个项目中使用到Redis,所以我就想是否能够使用Redis来代替MQ实现延迟队列的功能,于是我就查了一下有没有现成可用的方案,别说,还真给我查到了两种方案,并且我还仔细研究对比了这两个方案,发现要想很好的实现延迟队列,并不简单. 基于监听过期key的方式来实现延迟队列是我查到的第一个方案,为了弄懂这个方案实现的细节,我还特地去扒了扒官网,还真有所收获 。 一谈到发布订阅模式,其实一想到的就是MQ,只不过Redis也实现了一套,并且跟MQ贼像,如图: 图中的channel的概念跟MQ中的topic的概念差不多,你可以把channel理解成MQ中的topic. 生产者在消息发送时需要到指定发送到哪个channel上,消费者订阅这个channel就能获取到消息. 在Redis中,有很多默认的channel,只不过向这些channel发送消息的生产者不是我们写的代码,而是Redis本身。当消费者监听这些channel时,就可以感知到Redis中数据的变化. 这个功能Redis官方称为keyspace notifications,字面意思就是键空间通知. 这些默认的channel被分为两类: 以 __keyspace@<db>__: 为前缀,后面跟的是key的名称,表示监听跟这个key有关的事件. 举个例子,现在有个消费者监听了 __keyspace@0__:sanyou 这个channel,sanyou就是Redis中的一个普通key,那么当sanyou这个key被删除或者发生了其它事件,那么消费者就会收到sanyou这个key删除或者其它事件的消息 。 以 __keyevent@<db>__: 为前缀,后面跟的是消息事件类型,表示监听某个事件 。 同样举个例子,现在有个消费者监听了 __keyevent@0__:expired 这个channel,代表了监听key的过期事件。那么当某个Redis的key过期了(expired),那么消费者就能收到这个key过期的消息。如果把expired换成del,那么监听的就是删除事件。具体支持哪些事件,可从官网查. 上述db是指具体的数据库,Redis不是默认分为16个库么,序号从0-15,所以db就是0到15的数字,示例中的0就是指0对应的数据库. 通过对上面的两个概念了解之后,应该就对监听过期key的实现原理一目了然了,其实就是当这个key过期之后,Redis会发布一个key过期的事件到 __keyevent@<db>__:expired 这个channel,只要我们的服务监听这个channel,那么就能知道过期的Key,从而就算实现了延迟队列功能. 所以这种方式实现延迟队列就只需要两步: 好了,基本概念和核心原理都说完了之后,又到了show me the code环节. 好巧不巧,Spring已经实现了监听 __keyevent@*__:expired 这个channel这个功能, __keyevent@*__:expired 中的 * 代表通配符的意思,监听所有的数据库. 所以demo写起来就很简单了,只需3步即可 。 引入pom 。 配置类 。 KeyExpirationEventMessageListener实现了对 __keyevent@*__:expired channel的监听 。 当KeyExpirationEventMessageListener收到Redis发布的过期Key的消息的时候,会发布RedisKeyExpiredEvent事件 。 所以我们只需要监听RedisKeyExpiredEvent事件就可以拿到过期消息的Key,也就是延迟消息. 对RedisKeyExpiredEvent事件的监听实现MyRedisKeyExpiredEventListener 。 整个工程目录也简单 。 代码写好,启动应用 。 之后我直接通过Redis命令设置消息,就没通过代码发送消息了,消息的key为sanyou,值为task,值不重要,过期时间为5s 。 expire sanyou 5 。 如果上面都理论都正确,不出意外的话,5s后MyRedisKeyExpiredEventListener应该可以监听到sanyou这个key过期的消息,也就相当于拿到了延迟任务,控制台会打印出 获取到延迟消息:sanyou . 于是我满怀希望,静静地等待了5s。. 5、4、3、2、1,时间一到,我查看控制台,但是控制台并没有按照预期打印出上面那句话. 为什么会没打印出?难道是代码写错了?正当我准备检查代码的时候,官网的一段话道出了真实原因. 我给大家翻译一下上面这段话讲的内容. 上面这段话主要讨论的是key过期事件的时效性问题,首先提到了Redis过期key的两种清除策略,就是面试八股文常背的两种: 再后面那段话是核心,意思是说,key的过期事件发布时机并不是当这个key的过期时间到了之后就发布,而是这个key在Redis中被清理之后,也就是真正被删除之后才会发布. 到这我终于明白了,上面的例子中即使我设置了5s的过期时间,但是当5s过去之后,只要两种清除策略都不满足,没人访问sanyou这个key,后台的定时清理的任务也没扫描到sanyou这个key,那么就不会发布key过期的事件,自然而然也就监听不到了. 至于后台的定时清理的任务什么时候能扫到,这个没有固定时间,可能一到过期时间就被扫到,也可能等一定时间才会被扫到,这就可能会造成了客户端从发布到监听到的消息时间差会大于等于过期时间,从而造成一定时间消息的延迟,这就着实有点坑了。. 除了上面测试demo的时候遇到的坑之外,在我深入研究之后,还发现了一些更离谱的坑. 丢消息太频繁 。 Redis的丢消息跟MQ不一样,因为MQ都会有消息的持久化机制,可能只有当机器宕机了,才会丢点消息,但是Redis丢消息就很离谱,比如说你的服务在重启的时候就消息会丢消息. Redis实现的发布订阅模式,消息是没有持久化机制,当消息发布到某个channel之后,如果没有客户端订阅这个channel,那么这个消息就丢了,并不会像MQ一样进行持久化,等有消费者订阅的时候再给消费者消费. 所以说,假设服务重启期间,某个生产者或者是Redis本身发布了一条消息到某个channel,由于服务重启,没有监听这个channel,那么这个消息自然就丢了. 消息消费只有广播模式 。 Redis的发布订阅模式消息消费只有广播模式一种. 所谓的广播模式就是多个消费者订阅同一个channel,那么每个消费者都能消费到发布到这个channel的所有消息. 如图,生产者发布了一条消息,内容为sanyou,那么两个消费者都可以同时收到sanyou这条消息. 所以,如果通过监听channel来获取延迟任务,那么一旦服务实例有多个的话,还得保证消息不能重复处理,额外地增加了代码开发量. 接收到所有key的某个事件 。 这个不属于Redis发布订阅模式的问题,而是Redis本身事件通知的问题. 当消费者监听了以 __keyevent@<db>__: 开头的消息,那么会导致所有的key发生了事件都会被通知给消费者. 举个例子,某个消费者监听了 __keyevent@*__:expired 这个channel,那么只要key过期了,不管这个key是张三还会李四,消费者都能收到. 所以如果你只想消费某一类消息的key,那么还得自行加一些标记,比如消息的key加个前缀,消费的时候判断一下带前缀的key就是需要消费的任务. 所以,综上能够得出一个非常重要的结论,那就是监听Redis过期Key这种方式实现延迟队列, 不稳定,坑贼多! 。 那有没有比较靠谱的延迟队列的实现方案呢?这就不得不提到我研究的第二种方案了. Redisson他是Redis的儿子(Redis son),基于Redis实现了非常多的功能,其中最常使用的就是Redis分布式锁的实现,但是除了实现Redis分布式锁之外,它还实现了延迟队列的功能. 先来个demo,后面再来说说这种实现的原理. 引入pom 。 封装了一个RedissonDelayQueue类 。 这个类在创建的时候会去初始化延迟队列,创建一个RedissonClient对象,之后通过RedissonClient对象获取到RDelayedQueue和RBlockingQueue对象,传入的队列名字叫SANYOU,这个名字无所谓. 当延迟队列创建之后,会开启一个延迟任务的消费线程,这个线程会一直从RBlockingQueue中通过take方法阻塞获取延迟任务. 添加任务的时候是通过RDelayedQueue的offer方法添加的. controller类,通过接口添加任务,延迟时间为5s 。 启动项目,在浏览器输入如下连接,添加任务 。 静静等待5s,成功获取到任务. 如下图就是上面demo中,一个延迟队列会在Redis内部使用到的channel和数据类型 。 SANYOU前面的前缀都是固定的,Redisson创建的时候会拼上前缀. 有了这些概念之后,再来看看整体的运行原理图 。 这段lua脚本主要干了两件事: 当客户端监听到 redisson_delay_queue_channel:SANYOU 这个channel的消息时,会再次提交一个客户端延迟任务,延迟时间就是消息(最早到过期时间的延迟任务的到期时间戳)- 当前时间戳,这个时间其实也就是 redisson_delay_queue_channel:SANYOU 中最早到过期时间的任务还剩余的延迟时间. 此处可以等待10s,好好想想。. 这样,一旦时间来到了上面说的最早到过期时间任务的到期时间戳, redisson_delay_queue_timeout:SANYOU 中上面说的最早到过期时间的任务已经到期了,客户端的延迟任务也同时到期,于是开始执行lua脚本操作,及时将到了延迟时间的任务放到目标队列中。然后再次发布剩余的延迟任务中最早到期的任务到期时间戳到channe中,如此循环往复,一直运行下去,保证 redisson_delay_queue_timeout:SANYOU 中到期的数据能及时放到目标队列中. 所以,上述说了一大堆的主要的作用就是保证到了延迟时间的任务能够及时被放到目标队列. 这里再补充两个特殊情况,图中没有画出: 第一个就是如果 redisson_delay_queue_timeout:SANYOU 是新添加的任务(队列之前有或者没有任务)是队列中最早需要被执行的,也会发布消息到channel,之后就按时上面说的流程走了. 添加任务代码如下,也是通过lua脚本来的 。 第二种特殊情况就是项目启动的时候会执行一次客户端延迟任务。项目在重启时,由于没有客户端延迟任务的执行,可能会出现 redisson_delay_queue_timeout:SANYOU 队列中有到期但是没有被放到目标队列的可能,重启就执行一次就是为了保证到期的数据能被及时放到目标队列中. 现在来比较一下第一种方案和Redisson的这种方案,看看有没有第一种方案的那些坑. 第一个任务延迟的问题,Redisson方案理论上是没有延迟的,但是当消息数量增加,消费者消费缓慢这个情况下可能会导致延迟任务消费的延迟. 第二个丢消息的问题,Redisson方案很大程度上减轻了丢消息的可能性,因为所有的任务都是存在list和sorted set两种数据类型中,Redis有持久化机制,就算Redis宕机了,也就可能会丢一点点数据. 第三个广播消费任务的问题,这个是不会出现的,因为每个客户端都是从同一个目标队列中获取任务的. 第四个问题是Redis内部channel发布事件的问题,跟这种方案不沾边,就更不可能存在了. 所以,通过上面的对比可以看出,Redisson这种实现方案就显得更加的靠谱了. 往期热门文章推荐 。 写出漂亮代码的45个小技巧 。 扒一扒Bean注入到Spring的那些姿势 。 三万字盘点Spring/Boot的那些常用扩展点 。 两万字盘点那些被玩烂了的设计模式 。 RocketMQ保姆级教程 。 RocketMQ消息短暂而又精彩的一生 。 扫码或者搜索关注公众号 三友的java日记 ,及时干货不错过,公众号致力于通过画图加上通俗易懂的语言讲解技术,让技术更加容易学习,回复 面试 即可获得一套面试真题. 背景
监听过期key
1、Redis发布订阅模式
2、keyspace notifications
3、延迟队列实现原理
__keyevent@<db>__:expired
这个channel,处理延迟任务
4、demo
< dependency >
< groupId >
org.springframework.boot
</ groupId >
< artifactId >
spring-boot-starter-data-redis
</ artifactId >
< version >
2.2.5.RELEASE
</ version >
</ dependency >
< dependency >
< groupId >
org.springframework.boot
</ groupId >
< artifactId >
spring-boot-starter-web
</ artifactId >
< version >
2.2.5.RELEASE
</ version >
</ dependency >
@Configuration
public
class RedisConfiguration
{
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer (RedisConnectionFactory connectionFactory)
{
RedisMessageListenerContainer redisMessageListenerContainer =
new
RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(connectionFactory);
return
redisMessageListenerContainer;
}
@Bean
public KeyExpirationEventMessageListener redisKeyExpirationListener (RedisMessageListenerContainer redisMessageListenerContainer)
{
return
new
KeyExpirationEventMessageListener(redisMessageListenerContainer);
}
}
@Component
public
class MyRedisKeyExpiredEventListener implements ApplicationListener < RedisKeyExpiredEvent >
{
@Override
public void onApplicationEvent (RedisKeyExpiredEvent event)
{
byte
[] body = event.getSource();
System.out.println(
"获取到延迟消息:"
+
new
String(body));
}
}
set sanyou task
5、坑
Redisson实现延迟队列
1、demo
< dependency >
< groupId >
org.redisson
</ groupId >
< artifactId >
redisson
</ artifactId >
< version >
3.13.1
</ version >
</ dependency >
@Component
@Slf
4j
public
class RedissonDelayQueue
{
private
RedissonClient redissonClient;
private
RDelayedQueue<String> delayQueue;
private
RBlockingQueue<String> blockingQueue;
@PostConstruct
public void init ()
{
initDelayQueue();
startDelayQueueConsumer();
}
private void initDelayQueue ()
{
Config config =
new
Config();
SingleServerConfig serverConfig = config.useSingleServer();
serverConfig.setAddress(
"redis://localhost:6379"
);
redissonClient = Redisson.create(config);
blockingQueue = redissonClient.getBlockingQueue(
"SANYOU"
);
delayQueue = redissonClient.getDelayedQueue(blockingQueue);
}
private void startDelayQueueConsumer ()
{
new
Thread(() -> {
while
(
true
) {
try
{
String task = blockingQueue.take();
log.info(
"接收到延迟任务:{}"
, task);
}
catch
(Exception e) {
e.printStackTrace();
}
}
},
"SANYOU-Consumer"
).start();
}
public void offerTask (String task, long seconds)
{
log.info(
"添加延迟任务:{} 延迟时间:{}s"
, task, seconds);
delayQueue.offer(task, seconds, TimeUnit.SECONDS);
}
}
@RestController
public
class RedissonDelayQueueController
{
@Resource
private
RedissonDelayQueue redissonDelayQueue;
@GetMapping
(
"/add"
)
public void addTask (@RequestParam( "task" ) String task)
{
redissonDelayQueue.offerTask(task,
5
);
}
}
http://localhost:8080/add?task=sanyou
2、实现原理
redisson_delay_queue_timeout:SANYOU
,sorted set数据类型,存放所有延迟任务,按照延迟任务的到期时间戳(提交任务时的时间戳 + 延迟时间)来排序的,所以列表的最前面的第一个元素就是整个延迟队列中最早要被执行的任务,这个概念很重要
redisson_delay_queue:SANYOU
,list数据类型,也是存放所有的任务,但是研究下来发现好像没什么用。。
SANYOU
,list数据类型,被称为目标队列,这个里面存放的任务都是已经到了延迟时间的,可以被消费者获取的任务,所以上面demo中的RBlockingQueue的take方法是从这个目标队列中获取到任务的
redisson_delay_queue_channel:SANYOU
,是一个channel,用来通知客户端开启一个延迟任务
redisson_delay_queue_timeout:SANYOU
中,分数就是提交任务的时间戳+延迟时间,就是延迟任务的到期时间戳
redisson_delay_queue_timeout:SANYOU
中移除,存到
SANYOU
这个目标队列
redisson_delay_queue_timeout:SANYOU
中目前最早到过期时间的延迟任务的到期时间戳,然后发布到
redisson_delay_queue_channel:SANYOU
这个channel中
3、与第一种方案比较
最后此篇关于用Redis实现延迟队列,我研究了两种方案,发现并不简单的文章就讲到这里了,如果你想了解更多关于用Redis实现延迟队列,我研究了两种方案,发现并不简单的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在使用一个简单的脚本来延迟加载页面上的所有图像;图像源的路径包含在 data-src 属性中,然后放入 img 标记的实际 src 属性中。几乎大多数(?)延迟加载方法的实现都是如何工作的。 这是
我有一个具有多层 (SKNodes) 背景、游戏层、前景和 HUD 的场景,每个场景中都有多个 SKSpriteNode,用于滚动和您可以收集和点击的对象。 hud 层只有一个 SKSpriteNod
我有一个 Controller 函数来创建一些东西。调用该函数时,将运行 setInterval 来获取项目的状态。 这是服务: (function () { 'use strict';
在我的应用程序中,我播放音频直播,延迟非常重要。我正在使用 AVPlayer,但启动需要 5-6 秒,并且我需要最多 3 秒的延迟。我怎样才能更快地开始播放并减少延迟?设置一个小缓冲区就可以了?如何使
我有一个恼人的问题。我有这个简单的服务器代码(比方说): #!/usr/bin/env python3 import wsgiref.simple_server def my_func(env, st
我是 jquery deferreds 的新手。这里我有一个简单的example 。 谁能告诉我为什么在其他函数完成之前就触发完成函数(“现在是我的时间”)? 这里的人 example还创建一个延迟对
正在放置关闭 之前的标签标记相同的 sa 将它们放在 中部分并指定 defer="defer"属性? 最佳答案 是/否。 是的,因为放置 defer 标签会等到文档加载完毕后再执行。 否,因为放置
我知道Javascript没有delay(500)方法,它会延迟执行500毫秒,所以我一直试图通过使用setTimeout和setInterval来解决这个问题。 for(var i =0; i< 1
我们有一个读写主服务器和复制的从读服务器。在某些网络用例中,数据被发布并立即读取以发送回服务器。立即读取是在读取从属设备上完成的,由于延迟,数据尚未在那里更新。 我知道这可能是复制设置的一个常见问题,
我有以下 dag 设置以从 2015 年开始运行追赶。对于每个执行日期,任务实例在一分钟内完成。但是,第二天的任务仅在 5 分钟窗口内开始。例如。上午 10:00、上午 10:05、上午 10:10
当我在 WatchKit 中推送一个新 Controller 并在新 Controller 的awakeWithContext: 方法中使用 setTitle 时,它需要一秒钟左右来设置标题,直到
我将图像显示为 SVG 文件和文本。 出于某种原因,svg 图像的渲染速度比屏幕的其余部分慢,从而导致延迟,这对用户体验不利。 这种延迟正常吗?我该怎么做才能让整个屏幕同时呈现? Row( ma
我正在考虑在我的应用程序中使用 firebase 动态链接。我需要将唯一标识符从电子邮件生成的链接传递到用户应用程序中。当用户安装了应用程序时,这可以正常工作,但是,我对未安装应用程序的方式有些困惑。
您知道如何使用 JQuery 的延迟方法和一个函数来检测所有已更改的表单并将每个表单作为 Ajax 帖子提交吗? 如果我只列出大量表单提交,我可以得到同样的结果,但如果我使用... $('form.c
我需要一种方法来通过回调获取不同的脚本。这个方法工作正常: fetchScripts:function() { var _this=this; $.when( $.aj
我编写了一个 jquery 脚本,允许我淡入和淡出 div,然后重复。该代码运行良好。但是,当我尝试添加延迟(我希望 div 在淡出之前保持几秒钟)时,它无法正常工作。我尝试在代码中的几个地方添加延迟
我正在努力在延迟、带宽和吞吐量之间划清界限。 有人可以用简单的术语和简单的例子来解释我吗? 最佳答案 水比喻: 延迟 是穿过管子所需的时间。 带宽是管有多宽。 水流量为吞吐量 车辆类比: 从源到目的地
我有一个 CRM 系统,当添加联系人时,我想将他们添加到会计系统中。 我在 CRM 系统中设置了一个 Webhook,将联系人传递给 Azure 函数。 Azure 函数连接到会计系统 API 并在那
我有一个 Android AudioTrack,例如: private AudioTrack mAudioTrack; int min = AudioTrack.getMinBufferSize(sa
我正在 React 中开发一个 TODO 应用程序,并尝试构建将删除选中项目延迟 X 秒的功能,并且如果在这段时间内未选中该框,它将不会被删除。 我遇到的主要问题是当用户在同一 X 秒内检查、取消检查
我是一名优秀的程序员,十分优秀!