- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章基于Redis6.0 部署迷你版本消息队列由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
由于目前的研发团队处于公司初创阶段,尚未有能成熟的运维体系,对于市面上常见的成熟MQ搭建维护能力不足,但是又希望能有一款轻量级的消息系统供研发团队的成员使用,因此开展了对该方面相关的技术调研工作.
通过相关的技术调研后,决定挑选基于Redis实现消息系统.
具体技术选型原因:
为了方便让读者们从0到1地学习这块内容,我将会从环节搭建开始介绍起.
基于redis6.0.6版本搭建一套简单的消息队列系统。 环境部署:
docker run -p 6379:6379 --name redis_6_0_6 -d redis:6.0.6
如果本地没有相关镜像,可以尝试通过搭建下方命令进行镜像的拉取:
docker pull redis:6.0.6
当redis的基础环境配置好了之后,接下来便是基于redis内置的一些基本功能开发一款消息队列组件了.
下边我将分三种不同的技术方案来介绍如何实现一款轻量级的消息队列.
基于常规的队列结构来实现消息队列 。
这块的实现比较简单,主要是基于Redis内部的List结构来落地的,发送方将消息从队列的左边写入,然后消费方从队列的右边读取.
package org.idea.mq.redis.framework.mq.list; import com.alibaba.fastjson.JSON; import org.idea.mq.redis.framework.bean.MsgWrapper; import org.idea.mq.redis.framework.mq.IMQTemplate; import org.idea.mq.redis.framework.redis.IRedisService; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @Author linhao * @Date created in 3:09 下午 2022/2/7 */ @Componentpublic class RedisListMQTemplate implements IMQTemplate { @Resource private IRedisService iRedisService; @Override public boolean send(MsgWrapper msgWrapper) { try { String json = JSON.toJSONString(msgWrapper.getMsgInfo()); iRedisService.lpush(msgWrapper.getTopic(),json); return true; }catch (Exception e){ e.printStackTrace(); } return false; } }
这里存在几个问题点需要思考下:
这里我建议可以按照系统的项目名称前缀+业务标识来组织.
例如:用户系统中需要发布一条 会员已升级 的消息给到下游系统,此时可以将这条消息写入到名为:user-service:member-upgrade-list 的List集合中.
如果订单系统希望访问用户系统的消息,则需要在redis的key里指定user-service:member-upgrade-list关键字.
在这里插入图片描述 。
消息的监听机制如何实现?
对于List的消息可以采用轮询的方式获取,例如下边这段案例代码:
/** * 轮询的方式获取数据 * * @param msgWrapper */ private void pollingGet(MsgWrapper msgWrapper) { while (true) { String value = iRedisService.rpop(msgWrapper.getTopic()); if (!StringUtils.isEmpty(value)) { System.out.println(value); } //减少访问压力,定期睡眠一段时间 try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } }
但是轮询的方式比较消耗性能,所以可以尝试使用Redis的阻塞式弹出指令,例如下边这种方式来监听消息的触发行为:
/** * 阻塞的方式获取数据 */ private void blockGet(MsgWrapper msgWrapper) { while (true) { List<String> values = iRedisService.brpop(msgWrapper.getTopic()); if (!CollectionUtils.isEmpty(values)) { values.forEach(value -> { System.out.println(value); }); } } }
消息的可靠性传输如何确保?
在设计消息队列的时候,我们非常看重的就是消息的可靠性保证。当一条消息发送到消费端之后,如果出现了异常,希望消息能够实现重新发送的效果.
对于这种场景的设计我们可以尝试使用 BRPOPLPUSH 这条指令,这条指令可以帮助我们在Redis内部将数据弹出时写入到另一个备份队列中,这样即使弹出的消息消费失败了,备份队列中还有一份备用消息可以使用,而且弹出和写入备份队列操作在Redis内部做了封装,外界调用可以视作为一个原子操作.
是否可以支持广播的模式?
从List集合的实现原理来看,Redis弹出的元素只能返回给一个客户端链接,因此无法支持广播这种效果的实现.
基于发布订阅功能实现消息队列 。
Redis的内部提供了一个叫做发布订阅的功能,通过subscibe命令和publish指令可以帮助我们实现关于消息发布和通知的功能.
使用subscibe/publish命令实现的效果和List结构最大的不同在于它的传输方式:
publish部分的案例代码:
@Overridepublic boolean publish(String channel, String content) { try (Jedis jedis = iRedisFactory.getConnection()) { jedis.publish(channel, content); return true; } catch (Exception e) { throw new RuntimeException(e); } }
subscibe部分的代码:
@Overridepublic boolean subscribe(JedisPubSub jedisPubSub, String... channel) { try (Jedis jedis = iRedisFactory.getConnection()) { jedis.subscribe(jedisPubSub, channel); return true; } catch (Exception e) { throw new RuntimeException(e); } }
监听的部分可以通过额外开启一个线程来实现这部分效果:
@Componentpublic class RedisSubscribeMQListener implements IMQListener { @Resource private IRedisService iRedisService; class TestChannel extends JedisPubSub { @Override public void onMessage(String channel, String message) { super.onMessage(channel, message); System.out.println("channel " + channel + " 接收到消息:" + message); } @Override public void onSubscribe(String channel, int subscribedChannels) { System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d", channel, subscribedChannels)); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d", channel, subscribedChannels)); } } //所有频道的消息都监听 @Override public void onMessageReach(MsgWrapper msgWrapper) { Thread thread = new Thread(new Runnable() { @Override public void run() { iRedisService.subscribe(new TestChannel(), msgWrapper.getTopic()); } }); thread.start(); } }
要注意,回调通知的时候需要注入一个JedisPubSub的对象,这个对象的内部定义了接收消息之后的处理操作.
问题思考 。
如何保证消息的可靠性传输?
通过subscibe/publish处理的消息没有持久化的特性,一旦出现网络中断,Redis宕机这类异常的时候就会导致消息丢失,而且也没有较好的机制取支持消息重复消费的问题。因此可靠性方面较差.
基于Stream实现消息队列 。
Redis5.0中发布的Stream类型,也用来实现典型的消息队列。提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:
关于Stream的一些基本入门篇章这里不做过多介绍,感兴趣的朋友可以去阅读下这篇文章:
https://xie.infoq.cn/article/cdb47caddc5ff49dc09ea58cd 。
下边的部分我们直接来进入关于Redis XStream相关的实战环节.
封装消息监听功能 。
首先是定义一个MQ相关的接口:
public interface RedisStreamListener { /** * 处理正常消息 */ HandlerResult handleMsg(StreamEntry streamEntry); }
接着是基于这套接口做消息发送的实现:
package org.idea.mq.redis.framework.listener; import com.alibaba.fastjson.JSON; import org.idea.mq.redis.framework.bean.HandlerResult; import org.idea.mq.redis.framework.config.StreamListener; import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener; import org.idea.mq.redis.framework.redis.IRedisService; import org.idea.mq.redis.framework.utils.PayMsg; import redis.clients.jedis.StreamEntry; import javax.annotation.Resource; import java.util.Map; import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS; /** * @Author linhao * @Date created in 10:07 下午 2022/2/9 */ @StreamListener(streamName = "order-service:order-payed-stream", groupName = "order-service-group", consumerName = "user-service-consumer") public class OrderPayedListener implements RedisStreamMQListener { @Resource private IRedisService iRedisService; @Override public HandlerResult handleMsg(StreamEntry streamEntry) { Map<String, String> map = streamEntry.getFields(); String json = map.get("json"); PayMsg payMsg = JSON.parseObject(json, PayMsg.class); System.out.println("pending payMsg is : " + payMsg); return SUCCESS; } }
自定义消息注解 。
package org.idea.mq.redis.framework.config; import org.springframework.stereotype.Component; import java.lang.annotation.*; /** * @Author linhao * @Date created in 10:04 下午 2022/2/9 */ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented@Componentpublic @interface StreamListener { String streamName() default ""; String groupName() default ""; String consumerName() default ""; }
代码中有一个自定义的@StreamListener的注解,该注解的内部包含了一个@Component的注解,可以将使用了该注解的对象注入到Spring容器中.
为了能将这些个初始化类进行自动装配,还需要加入一个配置的对象,代码如下:
package org.idea.mq.redis.framework.config; import org.idea.mq.redis.framework.bean.HandlerResult; import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener; import org.idea.mq.redis.framework.redis.IRedisService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import redis.clients.jedis.StreamEntry; import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.StreamPendingEntry; import javax.annotation.Resource; import java.util.List; import java.util.Map; import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS; /** * @Author linhao * @Date created in 3:25 下午 2022/2/7 */ @Configurationpublic class StreamListenerConfiguration implements ApplicationListener<ApplicationReadyEvent> { @Resource private ApplicationContext applicationContext; @Resource private IRedisService iRedisService; private static Logger logger = LoggerFactory.getLogger(StreamListenerConfiguration.class); @Override public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { Map<String, RedisStreamMQListener> beanMap = applicationContext.getBeansOfType(RedisStreamMQListener.class); beanMap.values().forEach(redisStreamMQListener -> { StreamListener StreamListener = redisStreamMQListener.getClass().getAnnotation(StreamListener.class); ListenerInitWrapper listenerInitWrapper = new ListenerInitWrapper(StreamListener.streamName(), StreamListener.groupName(), StreamListener.consumerName()); Thread handleThread = new Thread(new CoreMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService)); Thread pendingHandleThread = new Thread(new PendingMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService)); handleThread.start(); pendingHandleThread.start(); logger.info("{} load successed ", redisStreamMQListener); }); } class PendingMsgHandlerThread implements Runnable { private ListenerInitWrapper listenerInitWrapper; private RedisStreamMQListener redisStreamMQListener; private IRedisService iRedisService; public PendingMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) { this.redisStreamMQListener = redisStreamMQListener; this.listenerInitWrapper = listenerInitWrapper; this.iRedisService = iRedisService; } @Override public void run() { String startId = "0-0"; while (true) { List<StreamPendingEntry> streamConsumersInfos = iRedisService.xpending(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId), 1); //如果该集合非空,则触发监听行为 if (!CollectionUtils.isEmpty(streamConsumersInfos)) { for (StreamPendingEntry streamConsumersInfo : streamConsumersInfos) { StreamEntryID streamEntryID = streamConsumersInfo.getID(); //比当前pending的streamId小1 String streamIdStr = streamEntryID.toString(); String[] items = streamIdStr.split("-"); Long timestamp = Long.valueOf(items[0]) - 1; String beforeId = timestamp + "-" + "0"; List<Map.Entry<String, List<StreamEntry>>> result = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(beforeId), 1, listenerInitWrapper.getConsumerName()); for (Map.Entry<String, List<StreamEntry>> streamInfo : result) { List<StreamEntry> streamEntries = streamInfo.getValue(); for (StreamEntry streamEntry : streamEntries) { try { //业务处理 HandlerResult handlerResult = redisStreamMQListener.handleMsg(streamEntry); if (SUCCESS.equals(handlerResult)) { startId = streamEntryID.toString(); iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId)); } } catch (Exception e) { logger.error("[PendingMsgHandlerThread] e is ", e); } } } } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } class CoreMsgHandlerThread implements Runnable { private ListenerInitWrapper listenerInitWrapper; private RedisStreamMQListener redisStreamMQListener; private IRedisService iRedisService; public CoreMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) { this.redisStreamMQListener = redisStreamMQListener; this.listenerInitWrapper = listenerInitWrapper; this.iRedisService = iRedisService; } @Override public void run() { while (true) { List<Map.Entry<String, List<StreamEntry>>> streamConsumersInfos = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), StreamEntryID.UNRECEIVED_ENTRY, 1, listenerInitWrapper.getConsumerName()); for (Map.Entry<String, List<StreamEntry>> streamInfo : streamConsumersInfos) { List<StreamEntry> streamEntries = streamInfo.getValue(); for (StreamEntry streamEntry : streamEntries) { //业务处理 try { HandlerResult result = redisStreamMQListener.handleMsg(streamEntry); if (SUCCESS.equals(result)) { iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), streamEntry.getID()); } } catch (Exception e) { logger.error("[CoreMsgHandlerThread] e is ", e); } } } } } } }
其原理是在Spring容器启动好了之后,监听Spring容器内部发出的ApplicationReadyEvent事件,监听该事件,并且开启两个后台线程用于处理redis内部的stream数据.
封装相关的消息发布功能 。
消息的发送部分比较简单,直接通过redis往stream里面写入数据即可 。
package org.idea.mq.redis.framework.producer; /** * @Author linhao * @Date created in 12:23 下午 2022/2/10 */ public interface IStreamProducer { /** * 指定streamName发布消息 * @param streamName * @param json */ void sendMsg(String streamName, String json); }
消息的传输格式采用json字符串的方式写入到redis内部的stream当中.
package org.idea.mq.redis.framework.producer; import org.idea.mq.redis.framework.redis.IRedisService; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; /** * @Author linhao * @Date created in 12:19 下午 2022/2/10 */ public class StreamProducer implements IStreamProducer{ @Resource private IRedisService iRedisService; @Override public void sendMsg(String streamName,String json){ Map<String,String> map = new HashMap<>(); map.put("json",json); iRedisService.xAdd(streamName,map); } }
注意,写入底层的时候,我使用的是Redis内部自动生成的ID序号,代码如下:
@Overridepublic boolean xAdd(String streamName, Map<String, String> stringMap) { try (Jedis jedis = iRedisFactory.getConnection()) { jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, stringMap); return true; } catch (Exception e) { throw new RuntimeException(e); } }
为了方便将其作为一个SpringBoot的starter组件供外界团队人员使用,我们可以将其封装为一个starter组件:
在这里插入图片描述 。
组件的测试 。
点对点发送测试 。
建立两套微服务工程,user-service 和 order-service,其中user-service部署两个服务节点,同属user-service-group。order-service也要部署两个服务节点,同属order-service-group.
最后两个微服务集群之间互相发布对方订阅的消息,查看是否能够正常接受,且同一个组内一次只有一个节点接收消息.
在这里插入图片描述 。
广播发送测试 。
使用之前搭建好的user-service模块,部署四个节点,订阅同一个stream队列,但是将其groupName设置为不同的属性,最后发布消息,查看四个节点都能正常接收.
在这里插入图片描述 。
具体细节在现有工程内部已经建立了测试模版,感兴趣的朋友可以去阅读下mq-redis-test模块的部分.
问题思考 。
为何同一个StreamName需要采用双线程消费?
一个线程用于接受Stream内部正常数据,如果业务正常处理则对其返回为ack信号,确认该消息已经消费成功。如果处理过程中出现异常,则不反回ACK信号,此时Redis内部会将该消息放入到Pending队列中,而第二个线程专门用于处理Pending队列内部的数据。如果处于Pending状态的消息第二次消费依然失败,则会进行定时轮询状况.
是否支持延迟重试 。
目前的设计其实一直都存在不足点,例如当消息消费异常后会进入轮询,严重情况下可能会导致消息消费出现死循环,并且一直堵塞。暂时还未实现类似于RocketMQ的那种间隔1,3,5...分钟定时投递消费失败消息都功能。感兴趣的小伙伴可以基于现有代码进行简单改造.
原文地址:https://mp.weixin.qq.com/s/sMQGHvy4enf2e34KtBls-A 。
最后此篇关于基于Redis6.0 部署迷你版本消息队列的文章就讲到这里了,如果你想了解更多关于基于Redis6.0 部署迷你版本消息队列的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
无法使用 Hive 版本 1.1.0 HBase 版本 0.94.8 和 hadoop 版本 2.7.0 从 hive 创建 Hbase 表 hive (default)> CREATE TABLE
我试图为 electron app 创建可执行文件但面临这个问题 Unable to determine Electron version. Please specify an Electron ve
我正在尝试让自适应阈值在 python 绑定(bind)到 opencv 中工作(swig 一个 - 无法让 opencv 2.0 工作,因为我正在使用 beagleboard 因为交叉编译还没有工作
我一直在 linux 机器上使用 JMeter,在命令行下使用了一段时间。工作正常。 今天,我在 Windows 机器(新客户端等)上尝试了它,它确实可以工作,但在控制台窗口中输出有很大不同。 Lin
在我的编码环境中,我通常使用最新版本的 Java 和 Eclipse。当我编写源代码时,我不会注意我使用的 API 方法或类是否向后兼容旧版本的 Java 或 Eclipse。在 javadoc 中存
问题是关于版本的特定组合,但更普遍。 我刚刚从 Kubuntu 12.04 升级到 14.04。现在,当我想编译 CUDA 代码(使用 CUDA 6.5)时,我得到: #error -- unsupp
我目前正在对我的一些应用程序进行沙箱处理,看来我必须删除一些功能才能满足 Mac App Store 沙箱(和其他)规则。 显然用户不会因为失去功能而感到高兴,我担心他们不会指责苹果制定了愚蠢的规则,
我用 flash 和 js 版本创建了一个动画横幅。 是否可以检测低于版本 9 的 ie 版本,然后提供 Flash 横幅,否则提供 js 横幅。 最佳答案 您可以使用条件注释来检测 IE 版本
我有一个处理不同位置的数据库的应用程序,我想检查这些数据库是否使用 Firebird 2.5 或更高版本打开。我们最近从 Firebird 2.0 迁移到了 2.5,我们有很多数据库可以响应 sele
我正在开发一个应用程序,我使用托管在我的服务器上的 Java 和 Jersey 构建了后端部分。我在服务器上使用 Tomcat7 来调用 Web 服务。 我以前有一台安装了 Ubuntu 的计算机,我
我可以使用 GetVersionEx() 函数来获取 Windows 版本,但是这个函数将返回一个数字而不是一个字符串。但是没有问题,因为我可以将数字转换为字符串,例如: if (osvi.dwMaj
我已经在我的系统中安装了 Anaconda 2 & 3。 Anaconda 2 包含 python 2.7 & Anaconda 3 包含 python 3.6。 我需要使用命令提示符运行我的 pyt
我正在尝试构建一个 Android 项目,但发生了以下错误 Error:(10, 1) A problem occurred evaluating project ':app'. > Failed t
关闭。这个问题需要更多focused .它目前不接受答案。 想改进这个问题吗? 更新问题,使其只关注一个问题 editing this post . 关闭 4 年前。 Improve this qu
在降级我的 GCC 之前,我想知道是否有办法确定我的机器中的哪些程序/框架或依赖项会中断,以及是否有更好的方法来执行 openpose 安装? (例如,在 CMake 中更改某些内容) 有没有办法在不
我已经在终端的代码sudo apt-get install Shadowsocks-qt5中安装了Shadowsocks-Qt5,然后我可以通过搜索找到启动图标,但是它当我点击图标时打不开。然后我尝试
在网络上找到的文档说,MLLP V2(第 2 版)是用于传输 HL7 版本 3 内容的所有消息传输协议(protocol)的要求。似乎 MLLP 第 2 版主要用于 HL7 第 3 版。 我们可以/应
我正在使用带有 selinium webdriver 的 Protractor 。我的chromeDriver版本是78.0.1,chrome版本是78.0.3904.97。两个版本都匹配,应该不会有
我正在按照教程设置 mysql 数据库并做一些事情。我无法找到数据库资源管理器。我读了很多,但在 Window->show View-> Dataxxx 或右侧上部选项卡中无法正常工作。 最佳答案 从
我已经在 KDE 桌面上安装了 Anaconda 2.0.1。当我运行 python 并看到所有已安装的模块时,我收到此消息“无法将不兼容的 Qt 库(版本 0x40801)与该库(版本 0x4080
我是一名优秀的程序员,十分优秀!