- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
在RocketMQ 5.0之前,消费有两种方式可以从Broker获取消息,分别为Pull模式和Push模式.
注:图片来自RocketMQ官方文档 。
不过不管是Pull模式还是Push模式,在集群模式下,一个消息队列只能分配给同一个消费组内的某一个消费者进行消费,所以需要进行Rebalance负载均衡为每个消费者分配消息队列之后才可以进行消息消费。 Rebalance的工作是在每个消费者端进行的,消费端负责的工作太多,除了负载均衡还有消费位点管理等功能,如果新增一种语言的支持,就需要重新实现一遍对应的业务逻辑代码.
除此以外,在RocketMQ 5.0以前负载均衡是以消息队列为维度为每个消费者分配的,一个消息队列只能分给组内一个消费者消费,所以会存在以下问题:
(1)队列只能分给组内一个消费者消费,也就无法通过扩展消费者的数量来提升消费能力; (2)消息队列数量与消费者数量比例不均衡时,可能会导致某些消费者没有消息队列可以分配或者某些消费者承担过多的消息队列,分配不均匀; (3)如果某个消费者hang主,会导致分配到该消费者的消息队列中的消息无法消费,导致消息积压; 。
在RocketMQ 5.0增加了Pop模式消费,将负载均衡、消费位点管理等功能放到了Broker端,减少客户端的负担,使其变得轻量级,并且5.0之后支持消息粒度的负载均衡.
对于PushConsumer和SimpleConsumer类型的消费者,默认且仅使用消息粒度负载均衡策略。 注:图片来自RocketMQ官方文档 。
消息粒度负载均衡策略中,同一消费组内的多个消费者将按照消息粒度平均分摊主题中的所有消息,即同一个队列中的消息,可被平均分配给组内多个消费者共同消费.
消息粒度负载均衡策略保证同一个队列的消息可以被组内多个消费者共同处理,但是该策略使用的消息分配算法结果是随机的,不能指定消息被哪一个特定的消费者处理。当消费者获取到某条消息后,服务端会对该消息加锁,保证该消息对其他消费者不可见,直到消息消费成功或者超时,所以多个消费者同时消费同一个消息队列中的消息,服务端也可以保证消息不会被多个消费者重复消费.
消息粒度负载均衡策略适用于绝大多数在线处理的业务场景.
首先客户端(消费者)向服务端(Broker)发送Pop请求,Broker端收到请求后以Pop模式获取消息,之后返回给客户端,客户端消费消息成功之后,向Broker发送ACK请求确认消息消费成功.
当POP出一条消息之后,这条消息就会在一段时间内不可见,在这个时间段内,这条消息不会再被POP出来,如果在这个期间未能收到该消息的ACK请求,过了这个不可见的时间之后,消息就会恢复可见状态,重新被消费.
POP的消费位点由Broker保存和控制,并且POP模式可以使多个消费者端消费同一个消息队列中的消息,消费者端不再需要在本地做负载均衡分配消息队列,只需要调用服务端提供的POP接口获取消息进行消费即可,即便某个消费者hang住,其他消费者依旧可以继续消费队列中的数据,不会造成消息堆积.
POP消息在Broker端的实现 。
Broker端在处理POP请求时,先在队列维度加锁,保证同一时间只有一个消费者可以从该队列中获取消息; 。
Broker端会从队列中获取一批消息,并构建这批消息对应的CheckPoint信息保存在Broker中,之后会与ACK的消息进行匹配; CheckPoint主要包括消息的 Topic,ConsumerGroup,QueueId,offset,POPTime,msgCout,reviveQueueId等信息.
CheckPoint会优先保存在内存中,如果在一段时间内收到了客户端的ACK消息,就会将对应的CheckPoint清除,并更新消费进度; 。
对于一段时间内为收到ACK消息的CheckPoint,会将其从内存中删除,然后发送到延时主题 SCHEDULE_TOPIC_XXXX 中,到达延时时间之后,消息会再被转发到REVIVE_TOPIC(会使用 REVIVE_LOG_ + 集群名称 作为主题)中,有一个线程去处理REVIVE_TOPIC中的数据,将里面的消息拉取放入到一个 MAP中,如果后续收到对应的ACK消息,则会更新REVIVE_TOPIC主题中的消费位点标识消息消费完成,如果过了一定时间依旧未收到对应的ACK消息,会查找这个CheckPoint对应的真实消息,将其放入到重试队列中,等待客户端消费,所以消费者消费的时候有一定概率可以消费到重试队列中的消息.
由于一个消息队列中的消息可以被多个消费者消费,如果某个消费者在消费某条消息之后一直未发生ACK消息,那么Broker是如何管理消费进度的,比如队列1中有1、2、3、4、5条消息,此时有三个消费者1、2、3,分别分配到了队列中的1、2、3条消息,此时消费者1已经对消息1ACK完毕,消费者3也对消息3ACK完毕,消费者2一直未ACK消息2,那么Broker如何设置消费进度?
个人认为,在一段时间内消息2对应的CheckPoint未匹配到对应的ACK消息,为了保证消费可以继续向后消费消息,应该会推进消费进度跳过这个消息,对于消息2,会按照超时处理逻辑,将其对应的CheckPoint先放入延时队列,再放入REVIVE_TOPIC中,之后等待ACK,如果之后一直还未收到ACK再将其放入重试队列,等待重新消费.
参考 RocketMQ官方文档 。
RocketMQ 5.0 POP 消费模式探秘 。
最后此篇关于【RocketMQ】RocketMQ5.0新特性(二)-Pop消费模式的文章就讲到这里了,如果你想了解更多关于【RocketMQ】RocketMQ5.0新特性(二)-Pop消费模式的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
对此感到疯狂,真的缺少一些东西。 我有webpack 4.6.0,webpack-cli ^ 2.1.2,所以是最新的。 在文档(https://webpack.js.org/concepts/mod
object Host "os.google.com" { import "windows" address = "linux.google.com" groups = ["linux"] } obj
每当我安装我的应用程序时,我都可以将数据库从 Assets 文件夹复制到 /data/data/packagename/databases/ .到此为止,应用程序工作得很好。 但 10 或 15 秒后
我在 cc 模式缓冲区中使用 hideshow.el 来折叠我不查看的文件部分。 如果能够在 XML 文档中做到这一点就好了。我使用 emacs 22.2.1 和内置的 sgml-mode 进行 xm
已结束。此问题不符合 Stack Overflow guidelines .它目前不接受答案。 我们不允许提出有关书籍、工具、软件库等方面的建议的问题。您可以编辑问题,以便用事实和引用来回答它。 关闭
根据java: public Scanner useDelimiter(String pattern) Sets this scanner's delimiting pattern to a patt
我读过一些关于 PRG 模式以及它如何防止用户重新提交表单的文章。比如this post有一张不错的图: 我能理解为什么在收到 2xx 后用户刷新页面时不会发生表单提交。但我仍然想知道: (1) 如果
看看下面的图片,您可能会清楚地看到这一点。 那么如何在带有其他一些 View 的简单屏幕中实现没有任何弹出/对话框/模式的微调器日期选择器? 我在整个网络上进行了谷歌搜索,但没有找到与之相关的任何合适
我不知道该怎么做,我一直遇到问题。 以下是代码: rows = int(input()) for i in range(1,rows): for j in range(1,i+1):
我想为重写创建一个正则表达式。 将所有请求重写为 index.php(不需要匹配),它不是以/api 开头,或者不是以('.html',或'.js'或'.css'或'.png'结束) 我的例子还是这样
MVC模式代表 Model-View-Controller(模型-视图-控制器) 模式 MVC模式用于应用程序的分层开发 Model(模型) - 模型代表一个存取数据的对象或 JAVA PO
我想为组织模式创建一个 RDF 模式世界。您可能知道,组织模式文档基于层次结构大纲,其中标题是主要的分组实体。 * March auxiliary :PROPERTIES: :HLEVEL: 1 :E
我正在编写一个可以从文件中读取 JSON 数据的软件。该文件包含“person”——一个值为对象数组的对象。我打算使用 JSON 模式验证库来验证内容,而不是自己编写代码。符合代表以下数据的 JSON
假设我有 4 张 table 人 公司 团体 和 账单 现在bills/persons和bills/companys和bills/groups之间是多对多的关系。 我看到了 4 种可能的 sql 模式
假设您有这样的文档: doc1: id:1 text: ... references: Journal1, 2013, pag 123 references: Journal2, 2014,
我有这个架构。它检查评论,目前工作正常。 var schema = { id: '', type: 'object', additionalProperties: false, pro
这可能很简单,但有人可以解释为什么以下模式匹配不明智吗?它说其他规则,例如1, 0, _ 永远不会匹配。 let matchTest(n : int) = let ran = new Rand
我有以下选择序列作为 XML 模式的一部分。理想情况下,我想要一个序列: 来自 my:namespace 的元素必须严格解析。 来自任何其他命名空间的元素,不包括 ##targetNamespace和
我希望编写一个 json 模式来涵盖这个(简化的)示例 { "errorMessage": "", "nbRunningQueries": 0, "isError": Fals
首先,我是 f# 的新手,所以也许答案很明显,但我没有看到。所以我有一些带有 id 和值的元组。我知道我正在寻找的 id,我想从我传入的三个元组中选择正确的元组。我打算用两个 match 语句来做到这
我是一名优秀的程序员,十分优秀!