- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
在RocketMQ 5.0之前,消费有两种方式可以从Broker获取消息,分别为Pull模式和Push模式.
注:图片来自RocketMQ官方文档 。
不过不管是Pull模式还是Push模式,在集群模式下,一个消息队列只能分配给同一个消费组内的某一个消费者进行消费,所以需要进行Rebalance负载均衡为每个消费者分配消息队列之后才可以进行消息消费。 Rebalance的工作是在每个消费者端进行的,消费端负责的工作太多,除了负载均衡还有消费位点管理等功能,如果新增一种语言的支持,就需要重新实现一遍对应的业务逻辑代码.
除此以外,在RocketMQ 5.0以前负载均衡是以消息队列为维度为每个消费者分配的,一个消息队列只能分给组内一个消费者消费,所以会存在以下问题:
(1)队列只能分给组内一个消费者消费,也就无法通过扩展消费者的数量来提升消费能力; (2)消息队列数量与消费者数量比例不均衡时,可能会导致某些消费者没有消息队列可以分配或者某些消费者承担过多的消息队列,分配不均匀; (3)如果某个消费者hang主,会导致分配到该消费者的消息队列中的消息无法消费,导致消息积压; 。
在RocketMQ 5.0增加了Pop模式消费,将负载均衡、消费位点管理等功能放到了Broker端,减少客户端的负担,使其变得轻量级,并且5.0之后支持消息粒度的负载均衡.
对于PushConsumer和SimpleConsumer类型的消费者,默认且仅使用消息粒度负载均衡策略。 注:图片来自RocketMQ官方文档 。
消息粒度负载均衡策略中,同一消费组内的多个消费者将按照消息粒度平均分摊主题中的所有消息,即同一个队列中的消息,可被平均分配给组内多个消费者共同消费.
消息粒度负载均衡策略保证同一个队列的消息可以被组内多个消费者共同处理,但是该策略使用的消息分配算法结果是随机的,不能指定消息被哪一个特定的消费者处理。当消费者获取到某条消息后,服务端会对该消息加锁,保证该消息对其他消费者不可见,直到消息消费成功或者超时,所以多个消费者同时消费同一个消息队列中的消息,服务端也可以保证消息不会被多个消费者重复消费.
消息粒度负载均衡策略适用于绝大多数在线处理的业务场景.
首先客户端(消费者)向服务端(Broker)发送Pop请求,Broker端收到请求后以Pop模式获取消息,之后返回给客户端,客户端消费消息成功之后,向Broker发送ACK请求确认消息消费成功.
当POP出一条消息之后,这条消息就会在一段时间内不可见,在这个时间段内,这条消息不会再被POP出来,如果在这个期间未能收到该消息的ACK请求,过了这个不可见的时间之后,消息就会恢复可见状态,重新被消费.
POP的消费位点由Broker保存和控制,并且POP模式可以使多个消费者端消费同一个消息队列中的消息,消费者端不再需要在本地做负载均衡分配消息队列,只需要调用服务端提供的POP接口获取消息进行消费即可,即便某个消费者hang住,其他消费者依旧可以继续消费队列中的数据,不会造成消息堆积.
POP消息在Broker端的实现 。
Broker端在处理POP请求时,先在队列维度加锁,保证同一时间只有一个消费者可以从该队列中获取消息; 。
Broker端会从队列中获取一批消息,并构建这批消息对应的CheckPoint信息保存在Broker中,之后会与ACK的消息进行匹配; CheckPoint主要包括消息的 Topic,ConsumerGroup,QueueId,offset,POPTime,msgCout,reviveQueueId等信息.
CheckPoint会优先保存在内存中,如果在一段时间内收到了客户端的ACK消息,就会将对应的CheckPoint清除,并更新消费进度; 。
对于一段时间内为收到ACK消息的CheckPoint,会将其从内存中删除,然后发送到延时主题 SCHEDULE_TOPIC_XXXX 中,到达延时时间之后,消息会再被转发到REVIVE_TOPIC(会使用 REVIVE_LOG_ + 集群名称 作为主题)中,有一个线程去处理REVIVE_TOPIC中的数据,将里面的消息拉取放入到一个 MAP中,如果后续收到对应的ACK消息,则会更新REVIVE_TOPIC主题中的消费位点标识消息消费完成,如果过了一定时间依旧未收到对应的ACK消息,会查找这个CheckPoint对应的真实消息,将其放入到重试队列中,等待客户端消费,所以消费者消费的时候有一定概率可以消费到重试队列中的消息.
由于一个消息队列中的消息可以被多个消费者消费,如果某个消费者在消费某条消息之后一直未发生ACK消息,那么Broker是如何管理消费进度的,比如队列1中有1、2、3、4、5条消息,此时有三个消费者1、2、3,分别分配到了队列中的1、2、3条消息,此时消费者1已经对消息1ACK完毕,消费者3也对消息3ACK完毕,消费者2一直未ACK消息2,那么Broker如何设置消费进度?
个人认为,在一段时间内消息2对应的CheckPoint未匹配到对应的ACK消息,为了保证消费可以继续向后消费消息,应该会推进消费进度跳过这个消息,对于消息2,会按照超时处理逻辑,将其对应的CheckPoint先放入延时队列,再放入REVIVE_TOPIC中,之后等待ACK,如果之后一直还未收到ACK再将其放入重试队列,等待重新消费.
参考 RocketMQ官方文档 。
RocketMQ 5.0 POP 消费模式探秘 。
最后此篇关于【RocketMQ】RocketMQ5.0新特性(二)-Pop消费模式的文章就讲到这里了,如果你想了解更多关于【RocketMQ】RocketMQ5.0新特性(二)-Pop消费模式的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
所以下面的内容让我很困惑。 #!/usr/bin/python test = [0, 0, 0, 1, 2, 3, 4, 5, 6] test1 = [0, 0, 0, 1, 2, 3, 4, 5,
这个问题是这个问题的后续问题: deque.popleft() and list.pop(0). Is there performance difference? 在 Python 中,我可以使用 .
我正在使用 bootstrap v2.2.2。我尝试了其他一些方法(即: close popover outside popover but inside stay open 和 How to dis
我正在用 Python 创建提交后脚本并使用子进程调用 git 命令。 在我的脚本中,我想在运行某些命令之前存储所有更改,然后将它们 pop 。问题是,如果没有任何东西可以存储,stash pop 会
我有一个嵌入在 UINavigationController 中的 UITableViewController,我正在尝试将 Peek & Pop 实现到 TableView 中。我的“窥视”部分工作
我的 Windows 机器上安装了 Cygwin、msysgit 和 TortoiseGit。我正在为 Cygwin 编写一个脚本,该脚本通过 ssh 将 git 推送到远程机器: git push
我在 Jenkins 中使用groovy,并且我需要这个字符串来获取其中的最后一个单词。假设字符串是 STATUS = "EXECUTE SIT" 。所以我所做的就是分割字符串,这样我就会得到一个数组
本文是不太具体的问题的后续/重新表述 Is it possible to have a hyperlink inside {content:"..."}? . 用户 Naeem Shaikh ,非常感
Navigator.of(context).pop 和 Navigator.pop(context) 有什么区别? 对我来说两者似乎都在做同样的工作,实际的区别是什么。一个被弃用了吗? 最佳答案 Na
这可能吗?我想要一个更简单的命令来 git stash pop stash@{13} 其中 stash@{13} 只是 last 意思是“最后的存储在列表上”或“最古老的藏品”。 我知道我可以为 gi
Closed. This question is not reproducible or was caused by typos。它当前不接受答案。 想改善这个问题吗?更新问题,以便将其作为on-to
Visual Studio 2019 中用于 GIT 存储的以下命令有什么区别? 分阶段 pop 和恢复 (--index) 全部 pop 为未暂存状态 使用https://visualstudio.
我想弹出模型的最后一层。所以我使用了 tf.keras.layers.pop(),但它不起作用。 base_model.summary() base_model.layers.pop() base_m
我想使用 navigator.pop 将值从第 2 页传递到第 1 页,并使用 initstate 中的新值刷新或重新加载我的第 1 页或任何其他解决方法? 我能够在第一页中获取这些值,但无法使用 i
pop 函数的文档说: user> (doc pop) ------------------------- clojure.core/pop ([coll]) For a list or queu
我有以下点击处理程序,当点击它时,我从 handsontable 中提取一个数组然后从数组中删除最后一个元素,并将新数组传递给 ajax post。问题是,如果我再次单击该按钮,它将从数组中删除另一个
我在mailmuch中制作了表单并从中获取了代码,我添加到网页并使用href,当用户单击显示弹出窗口时显示表单。没关系 show popup 但是现在我有ajax请求,我希望在ajax返回成功时显示此
我目前正在学习 Python 中的 pop() 函数并有一个问题。 >>> a = [1,2,3,4] >>> a.pop(3) #or a.pop() 4 >>> print(a) [1,2,3]
我目前正在学习 Python 中的 pop() 函数并有一个问题。 >>> a = [1,2,3,4] >>> a.pop(3) #or a.pop() 4 >>> print(a) [1,2,3]
我可以将对象$push编码到Mongo数组上,如下所示: db.foo.update({},{$push:{bar:3}}) 但是我找不到一种语法,可以让我对列表中的最后一项进行$pop编码。 我已经
我是一名优秀的程序员,十分优秀!