- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章RabbitMQ 高可用之如何确保消息成功消费由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
前面一篇文章松哥和大家聊了 MQ 高可用之如何确保消息成功发送,各种配置齐上阵,最终确保了消息的成功发送,甚至在一些极端情况下还可能发生同一条消息重复发送的情况,不管怎么样,消息总算发送出去了,如果小伙伴们还没看过上篇文章,建议先看看,再来学习本文:
四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?
今天我们就来聊一聊消息消费的问题,看看如何确保消息消费成功,并且确保幂等性.
RabbitMQ 的消息消费,整体上来说有两种不同的思路:
两种方式我都举个例子看下.
先来看推(push):
这种方式大家比较常见,就是通过 @RabbitListener 注解去标记消费者,如下:
当监听的队列中有消息时,就会触发该方法.
再来看拉(pull):
调用 receiveAndConvert 方法,方法参数为队列名称,方法执行完成后,会从 MQ 上拉取一条消息下来,如果该方法返回值为 null,表示该队列上没有消息了。receiveAndConvert 方法有一个重载方法,可以在重载方法中传入一个等待超时时间,例如 3 秒。此时,假设队列中没有消息了,则 receiveAndConvert 方法会阻塞 3 秒,3 秒内如果队列中有了新消息就返回,3 秒后如果队列中还是没有新消息,就返回 null,这个等待超时时间要是不设置的话,默认为 0.
这是消息两种不同的消费模式.
如果需要从消息队列中持续获得消息,就可以使用推模式;如果只是单纯的消费一条消息,则使用拉模式即可。切忌将拉模式放到一个死循环中,变相的订阅消息,这会严重影响 RabbitMQ 的性能.
在上篇文章中,我们想尽办法确保消息能够发送成功,对于消息消费成功,其实官方提供了相关的机制,我们一起来看下.
为了保证消息能够可靠的到达消息消费者,RabbitMQ 中提供了消息消费确认机制。当消费者去消费消息的时候,可以通过指定 autoAck 参数来表示消息消费的确认方式.
当 autoAck 为 false 的时候,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息打上删除标记,然后再删除.
当 autoAck 为 true 的时候,此时消息消费者就会自动把发送出去的消息设置为确认,然后将消息移除(从内存或者磁盘中),即使这些消息并没有到达消费者.
我们来看一张图:
如上图所示,在 RabbitMQ 的 web 管理页面:
这是我们可以从 UI 层面观察消息的消费情况确认情况.
当我们将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,消费分成了两个部分:
换句话说,当设置 autoAck 为 false 的时候,消费者就变得非常从容了,它将有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack,此时 RabbitMQ 才会认为这条消息消费成功了。如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费.
综上所述,确保消息被成功消费,无非就是手动 Ack 或者自动 Ack,无他。当然,无论这两种中的哪一种,最终都有可能导致消息被重复消费,所以一般来说我们还需要在处理消息时,解决幂等性问题.
当客户端收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息。我们来看下拒绝的方式:
消费者收到消息之后,可以选择拒绝消费该条消息,拒绝的步骤分两步:
调用 basicReject 方法时,第二个参数是 requeue,即是否重新入队。如果第二个参数为 true,则这条被拒绝的消息会重新进入到消息队列中,等待下一次被消费;如果第二个参数为 false,则这条被拒绝的消息就会被丢掉,不会有新的消费者去消费它了.
需要注意的是,basicReject 方法一次只能拒绝一条消息.
消息确认分为自动确认和手动确认,我们分别来看.
先来看看自动确认,在 Spring Boot 中,默认情况下,消息消费就是自动确认的.
我们来看如下一个消息消费方法:
通过 @Componet 注解将当前类注入到 Spring 容器中,然后通过 @RabbitListener 注解来标记一个消息消费方法,默认情况下,消息消费方法自带事务,即如果该方法在执行过程中抛出异常,那么被消费的消息会重新回到队列中等待下一次被消费,如果该方法正常执行完没有抛出异常,则这条消息就算是被消费了.
手动确认我又把它分为两种:推模式手动确认与拉模式手动确认.
4.2.1 推模式手动确认 。
要开启手动确认,需要我们首先关闭自动确认,关闭方式如下:
这个配置表示将消息的确认模式改为手动确认.
接下来我们来看下消费者中的代码:
将消费者要做的事情放到一个 try..catch 代码块中.
如果消息正常消费成功,则执行 basicAck 完成确认.
如果消息消费失败,则执行 basicNack 方法,告诉 RabbitMQ 消息消费失败.
这里涉及到两个方法:
当 basicNack 中最后一个参数设置为 false 的时候,还涉及到一个死信队列的问题,这个松哥以后再专门写文章和大家细聊.
4.2.2 拉模式手动确认 。
拉模式手动 ack 比较麻烦一些,在 Spring 中封装的 RabbitTemplate 中并未找到对应的方法,所以我们得用原生的办法,如下:
这里涉及到的 basicAck 和 basicNack 方法跟前面的一样,我就不再赘述.
最后我们再来说说消息的幂等性问题.
大家设想下面一个场景:
消费者在消费完一条消息后,向 RabbitMQ 发送一个 ack 确认,此时由于网络断开或者其他原因导致 RabbitMQ 并没有收到这个 ack,那么此时 RabbitMQ 并不会将该条消息删除,当重新建立起连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。同时,由于类似的原因,消息在发送的时候,同一条消息也可能会发送两次(参见四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?)。种种原因导致我们在消费消息时,一定要处理好幂等性问题.
幂等性问题的处理倒也不难,基本上都是从业务上来处理,我来大概说说思路.
采用 Redis,在消费者消费消息之前,现将消息的 id 放到 Redis 中,存储方式如下:
如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,如果 key 已经存在(说明之前有人消费过该消息),获取他的值,如果是 0,当前消费者就什么都不做,如果是 1,直接 ack.
极端情况:第一个消费者在执行业务时,出现了死锁,在 setnx 的基础上,再给 key 设置一个生存时间。生产者,发送消息时,指定 messageId.
当然这只是一个简单思路供大家参考.
松哥在 vhr 项目中也处理了消息幂等性问题,感兴趣的小伙伴可以查看 vhr 源码(https://github.com/lenve/vhr),代码在 mailserver 中.
好啦,今天就和小伙伴们聊了下 RabbitMQ 中和消息消费相关的几个话题,感兴趣的小伙伴可以实践下哦~ 。
原文链接:https://mp.weixin.qq.com/s/5szA0KBpFn9G3DeS9C0U3w 。
最后此篇关于RabbitMQ 高可用之如何确保消息成功消费的文章就讲到这里了,如果你想了解更多关于RabbitMQ 高可用之如何确保消息成功消费的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在寻找一种方法来创建根据价格选择我的产品的过滤器(选择下拉菜单)。 我知道这样的查询是完全可能的: SELECT * FROM products ORDER BY price ASC SELECT
函数参数中或显示尺寸时(高度,宽度)的顺序是否有约定? 最佳答案 我不知道大量的语言,但我使用过的语言(宽度,高度)。它更适合沿着 (x, y) 坐标线。 关于language-agnostic -
在我的表单中,我让用户输入房间的长度高度和宽度以获得 m2、m3 和瓦特的计算值。但是用户也应该能够直接输入 height 和 m2 来获取值。我尝试了很多语法,但 if else 不能正常工作。我知
我在 Elasticsearch 中创建了一个索引,看起来像 {"amazingdocs":{"aliases":{},"mappings":{"properties":{"Adj Close":{"
我有以下功能,我需要清除数据库中的所有图片列并移动到文件系统。当我一次性完成这一切时,内存太多并且会崩溃。我切换到递归函数并执行 20 次写入和批量操作。 我需要为大约 6 个表执行此操作。我的 Re
我正在编写一个函数来计算 PI 的值,并将其作为 double 值返回。到目前为止,一切都很好。但是一旦函数到达小数点后14位,它就不能再保存了。我假设这是因为 double 有限。我应该怎么做才能继
2020年是中国CDN行业从98年诞生到今天快速发展的第二十四年,相关数据显示,全国感知网速持续上扬,达到了3.29兆/秒,标志着在宽带中国的政策指导下,中国的网速水平正在大步赶上世界发达国家的水平
在 aerospike 集合中,我们有四个 bin userId、adId、timestamp、eventype,主键是 userId:timestamp。在 userId 上创建二级索引以获取特定用
$('#container').highcharts('Map', { title : { text : 'Highmaps basic demo'
有没有办法显示自定义宽度/高度的YouTube视频? 最佳答案 在YouTube网站上的this link中: You can resize the player by editing the obj
我使用 Highcharts ,我想在 Highcharts 状态下悬停时制作动态不同的颜色。 正如你可以看到不同的颜色,这就是我做的 var usMapChart , data = [] ; va
在所有节点上运行 tpstats 后。我看到很多节点都有大量的 ALL TIME BLOCKED NTR。我们有一个 4 节点集群,NTR ALL TIME BLOCKED 的值为: 节点 1:239
我发现 APC 上存在大量碎片 (>80%),但实际上性能似乎相当不错。我有 read another post这建议在 wordpress/w3tc 中禁用对象缓存,但我想知道减少碎片是否比首先缓存
对于我的脚本类(class),我们必须制作更高/更低的游戏。到目前为止,这是我的代码: import random seedVal = int(input("What seed should be u
我发现 APC 上存在大量碎片 (>80%),但实际上性能似乎相当不错。我有 read another post这建议在 wordpress/w3tc 中禁用对象缓存,但我想知道减少碎片是否比首先缓存
对于我的脚本类(class),我们必须制作更高/更低的游戏。到目前为止,这是我的代码: import random seedVal = int(input("What seed should be u
我已经 seen >2 字节的 unicode 代码点,如 U+10000 可以成对编写,如 \uD800\uDC00。它们似乎以半字节 d 开头,但我只注意到了这一点。 这个 split Actio
有人可以帮我理解为什么我的饼图百分比计算不正确吗?看截图: 根据我的计算,如 RHS 上所示,支出百分比应为 24.73%。传递给 Highcharts 的值如下:- 花费:204827099.36-
我阅读了有关该问题的所有答案,但我还没有找到任何解决方案。 我有一个应用程序,由我的 api 服务器提供。 Wildfly 8.1 和 Mysql 5.6。当查看时间到来时(Wildfly 服务器连接
我正在用选定的项目创建圆形导航。当用户单击任何项目时,它将移动到定义的特定点。一切都很好,除了当你继续点击项目时,当动画表现不同并且项目在 360 度圆中移动并且它被重置直到你重复场景时,我希望它
我是一名优秀的程序员,十分优秀!