- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
在上一讲中,介绍了消息的存储,生产者向Broker发送消息之后,数据会写入到CommitLog中,这一讲,就来看一下消费者是如何从Broker拉取消息的.
RocketMQ消息的消费以组为单位,有两种消费模式:
广播模式:同一个消息队列可以分配给组内的每个消费者,每条消息可以被组内的消费者进行消费.
集群模式:同一个消费组下,一个消息队列同一时间只能分配给组内的一个消费者,也就是一条消息只能被组内的一个消费者进行消费.
通常使用集群模式的情况比较多,接下来以集群模式(Push模式)为例看一下消息的拉取过程.
消费者在启动的时候主要做了以下几件事情:
RocketMQ消费者以组为单位,启用消费者时,需要设置消费者组名称以及要订阅的Topic信息(需要知道要消费哪个Topic上面的消息):
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQPushConsumerTest {
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private static DefaultMQPushConsumer pushConsumer;
@Before
public void init() throws Exception {
// ...
// 消费者组名称
String consumerGroup = "FooBarGroup";
// 实例化DefaultMQPushConsumer
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
// ...
// 设置订阅的主题
pushConsumer.subscribe("FooBar", "*");
// 启动消费者
pushConsumer.start();
}
}
所以消费者启动的时候,首先会获取订阅的Topic信息,由于一个消费者可以订阅多个Topic,所以消费者使用一个Map存储订阅的Topic信息,KEY为Topic名称,VALUE为对应的表达式,之后会遍历每一个订阅的Topic,然后将其封装为 SubscriptionData 对象,并加入到负载均衡对象 RebalanceImpl 中,等待进行负载均衡.
MQClientInstance 中有以下几个关键信息:
PullMessageService
,是用来从Broker拉取消息的服务; RebalanceService
,是用来进行负载均衡,为每个消费者分配对应的消费队列; MQConsumerInner
对象,每一个消费者启动的时候会向这里注册,将自己加入到 consumerTable
中; 需要注意 MQClientInstance 实例是以clientId为单位创建的,相同的clientId共用一个 MQClientInstance 实例,clientId由以下信息进行拼装: (1)服务器的IP; (2)实例名称(instanceName); (3)单元名称(unitName)(不为空的时候才拼接); 最终拼接的clientId字符串为: 服务器IP + @ + 实例名称 + @ + 单元名称 。 所以在同一个服务器上,如果实例名称和单元名称也相同的话,所有的消费者会共同使用一个 MQClientInstance 实例.
MQClientInstance 启动的时候会把消息拉取服务和负载均衡服务也启动(启动对应的线程).
前面已经得知了当前消费者订阅的Topic信息,接下来需要知道这些Topic的分布情况,也就是分布在哪些Broker上,Topic的分布信息可以从NameServer中获取到,因为Broker会向NameServer进行注册,上报自己负责的Topic信息,所以这一步消费者向NameServer发送请求,从NameServer中拉取最新的Topic的路由信息缓存在本地.
消费者在进行消费的时候,需要知道应该从哪个位置开始拉取消息, OffsetStore 类中记录这些数据,不同的模式对应的实现类不同:
RemoteBrokerOffsetStore
。 LocalFileOffsetStore
。 这里关注集群模式,在集群模式下,加载消费进度时,会进入 RemoteBrokerOffsetStore 的load方法,load方法是从本地加载文件读取消费进度,因为集群模式下需要从Broker获取,所以load方法什么也没干,在负载均衡分配了消息队列,进行消息拉取的时候再向Broker发送请求获取消费进度.
由于消费者增加或者减少会影响消息队列的分配,所以Broker需要感知消费者的上下线情况,消费者在启动时会向所有的Broker发送心跳包进行注册,通知Broker消费者上线.
Broker收到消费者发送的心跳包之后,会从请求中解析相关信息,将该消费者注册到Broker维护的消费者列表 consumerTable 中,其中KEY为消费者组名称,Value为该消费组的详细信息( ConsumerGroupInfo 对象),里面记录了该消费组下所有消费者的Channel信息.
启动最后一步,会立即触发一次负载均衡,为消费者分配消息队列.
负载均衡是通过消费者启动时创建的 MQClientInstance 实例实现的( doRebalance 方法),它的处理逻辑如下:
MQClientInstance 中有一个消费者列表 consumerTable ,存放了该实例上注册的所有消费者对象,Key为组名称,Value为消费者,所以会遍历所有的消费者,对该实例上注册的每一个消费者进行负载均衡; 。
对于每一个消费者,需要获取其订阅的所有Topic信息,然后再对每一个Topic进行负载均衡,前面可知消费者订阅的Topic信息被封装为了 SubscriptionData 对象,所以这里获取到所有的 SubscriptionData 对象进行遍历,开始为每一个消费者分配消息队列; 。
这里我们关注集群模式下的分配,它的处理逻辑如下:
MessageQueue
对象); 消费者在启动时向NameServer发送请求获取Topic的路由信息,从中解析中每个主题对应的消息队列,放入负载均衡对象的 topicSubscribeInfoTable 变量中,所以这一步直接从topicSubscribeInfoTable中获取主题对应的消息队列即可.
根据主题信息和消费者组名称,查找订阅了该主题的所有消费者的ID: (1) 根据主题选取Broker :从NameServer中拉取的主题路信息中可以找到每个主题分布在哪些Broker上,从中随机选取一个Broker; (2) 向Broker发送请求 :根据上一步获取到的Broker,向其发送请求,查找订阅了该主题的所有消费者的ID(消费者会向Broker注册,所以可以通过Broker查找订阅了某个Topic的消费者); 。
如果主题对应的消息队列集合和获取到的消费者ID都不为空,对消息队列集合和消费ID集合进行排序; 。
获取分配策略,根据具体的分配策略,为当前的消费者分配对应的消费队列,RocketMQ默认提供了以下几种分配策略:
AllocateMessageQueueAveragely:平均分配策略,根据消息队列的数量和消费者的个数计算每个消费者分配的队列个数.
AllocateMessageQueueAveragelyByCircle:平均轮询分配策略,将消息队列逐个分发给每个消费者.
AllocateMessageQueueConsistentHash:根据一致性 hash进行分配.
AllocateMessageQueueByConfig:根据配置,为每一个消费者配置固定的消息队列 .
AllocateMessageQueueByMachineRoom:分配指定机房下的消息队列给消费者.
AllocateMachineRoomNearby:优先分配给同机房的消费者.
根据最新分配的消息队列,更新当前消费者负责的消息处理队列,
每个消息队列( MessageQueue )对应一个处理队列( ProcessQueue ),后续使用这个 ProcessQueue 记录的信息进行消息拉取:
分配给当前消费者的所有消息队列,由一个Map存储( processQueueTable ),KEY为消息队列,value为对应的处理队列:
由于负载均衡之后,消费者负责的消息队列可能发生变化,所以这里需要更新当前消费者负责的消息队列,它主要是拿负载均衡后重新分配给当前消费的消息队列集合与上一次记录的分配信息做对比,有以下两种情况:
(1) 某个消息队列之前分配给了当前消费者,但是这次没有,说明此队列不再由当前消费者消负责,需要进行删除,此时将该消息队列对应的处理队列中的 dropped 状态置为true即可; (2) 某个消费者之前未分配给当前消费者,但是本次负载均衡之后分配给了当前消费者,需要进行新增,会新建一个处理队列( ProcessQueue )加入到 processQueueTable 中; 。
对于情况2,由于是新增分配的消息队列,消费者还需要知道从哪个位置开始拉取消息,所以需要通过 OffsetStore 来获取存储的消费进度,也就是上次消费到哪条消息了,然后判断本次从哪条消息开始拉取。 前面在消费者启动的提到集群模式下对应的实现类为 RemoteBrokerOffsetStore ,再进入到这一步的时候,才会向Broker发送请求,获取消息队列的消费进度,并更新到 offsetTable 中.
从Broker获取消费进度之后,有以下几种拉取策略: (1) CONSUME_FROM_LAST_OFFSET(上次消费位置开始拉取) :从 OffsetStore 获取消息队列对应的消费进度值 lastOffset ,判断是否大于等于0,如果大于0则返回 lastOffset 的值,从这个位置继续拉取; (2) CONSUME_FROM_FIRST_OFFSET(第一个位置开始拉取) :从 OffsetStore 获取消息队列对应的消费进度值 lastOffset ,如果大于等于0,依旧从这个位置继续拉取,否则才从第一条消息拉取,此时返回值为0; (3) CONSUME_FROM_TIMESTAMP(根据时间戳拉取) :从 OffsetStore 获取消息队列对应的消费进度值 lastOffset ,如果大于等于0,依旧从这个位置继续拉取,否则在不是重试TOPIC的情况下,根据消费者的启动时间查找应该从什么位置开始消费; 。
nextOffset 拉取偏移量的值确定之后,会将 ProcessQueue 加入到 processQueueTable 中,并构建对应的消息拉取请求 PullRequest ,并设置以下信息:
nextOffset
的值; PullRequest 构建完毕之后会将其加入到消息拉取服务中的一个阻塞队列中,等待消息拉取服务进行处理.
消息拉取服务中,使用了一个阻塞队列,阻塞队列中存放的是消息拉取请求 PullRequest 对象,如果有消息拉取请求到来,就会从阻塞队列中取出对应的请求进行处理,从Broker拉取消息,拉取消息的处理逻辑如下:
PullRequest
中获取对应的处理队列 ProcessQueue
,先判断是否置为Dropped删除状态,如果处于删除状态不进行处理; ConsumeConcurrentlyMaxSpan
的值,如果超过需要进行流量控制,延迟50毫秒后重新加入队列中进行处理; ProcessQueue
关联了一个消息队列 MessageQueue
对象,消息队列对象中有其所在的Broker名称,根据名称再查找该Broker的详细信息; ConsumeQueue RocketMQ在消息存储的时候将消息顺序写入CommitLog文件,如果想根据Topic对消息进行查找,需要扫描所有CommitLog文件,这种方式性能低下,所以RocketMQ又设计了 ConsumeQueue 存储消息的逻辑索引,在RocketMQ的存储文件目录下,有一个consumequeue文件夹,里面又按Topic分组,每个Topic一个文件夹,Topic文件夹内是该Topic的所有消息队列,以消息队列ID命名文件夹,每个消息队列都有自己对应的 ConsumeQueue 文件:
ConsumeQueue中存储的每条数据大小是固定的,总共20个字节:
Broker在收到消费发送的拉取消息请求后,会根据拉取请求中的Topic名称和消息队列ID( queueId )查找对应的消费信息 ConsumeQueue 对象: Broker中的 consumeQueueTable 中存储了每个Topic对应的消费队列信息,Key为Topic名称,Value为Topic对应的消费队列信息,它又是一个MAP,其中Key为消息队列ID( queueId ),value为该消息队列的消费消费信息( ConsumeQueue 对象).
在获取到息 ConsumeQueue 之后,从中可以获取其中记录的最小偏移量 minOffset 和最大偏移量 maxOffset ,然后与拉取请求中携带的消息偏移量 offset 的值对比进行合法校验,校验通过才可以查找消息,对于消息查找结果大概有如下几种状态:
nextOffsetCorrection方法 :用于校正消费者的拉取偏移量,不过需要注意, 当前Broker是主节点或者开启了 OffsetCheckInSlave 校验时,才会对拉取偏移量进行纠正 ,所以以下几种状态中如果调用了此方法进行校正,前提是满足此条件.
CommitLog
中的最大偏移量 maxOffset
值为0,说明当前消息队列中还没有消息,返回 NO_MESSAGE_IN_QUEUE
状态; offset
的值小于 CommitLog
文件的最小偏移量 minOffset
,说明拉取进度值过小,调用 nextOffsetCorrection
校正下一次的拉取偏移量为 CommitLog
文件的最小偏移量(需要满足校正的条件),并将这个偏移量放入 nextBeginOffset
变量; offset
等于 CommitLog
文件的最大偏移量 maxOffset
,依旧调用 nextOffsetCorrection
方法进行校正(需要满足校正的条件),只不过校正的时候使用的还是 offset
的值,可以理解为这种情况什么也没干。 offset
大于 CommitLog
文件最大偏移量 maxOffset
,说明拉取偏移量越界,此时有以下两种情况:
minOffset
量为0,调用 nextOffsetCorrection
方法校正下一次拉取偏移量为 minOffset
的值(需要满足校正的条件),也就是告诉消费者,下次从偏移量为0的位置开始拉取消息; minOffset
不为0,调用 nextOffsetCorrection
方法校正下一次拉取偏移量为 maxOffset
的值(需要满足校正的条件),将下一次拉取偏移量的值设置为最大偏移量; offset
的值介于最大最小偏移量之间,此时可以正常查找消息; 需要注意以上是消息查找的结果状态,Broker并没有使用这个状态直接返回给消费者,而是又做了一次处理.
经过以上步骤后,除了查找到的消息内容,Broker还会在消息返回结果中设置以下信息:
nextBeginOffset
变量的值; CommitLog
文件的最小偏移量 minOffset
和最大偏移量 maxOffset
; 消费者收到Broker返回的响应后,对响应结果进行处理:
ConsumeMessageService
中进行消费(异步处理) ,然后判断拉取间隔的值是否大于0,如果大于0,会延迟一段时间进行下一次拉取,如果拉取间隔小于0表示需要立刻进行下一次拉取,此时将拉取请求加入阻塞队列中进行下一次拉取。 offsetStore
),并持久化保存,然后将当前的拉取请求中的处理队列状态置为dorp并删除处理队列,等待下一次重新构建拉取请求进行处理。 RocketMQ消息拉取相关源码可参考: 【RocketMQ】【源码】消息的拉取 。
最后此篇关于【RocketMQ】消息的拉取总结的文章就讲到这里了,如果你想了解更多关于【RocketMQ】消息的拉取总结的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我一直在读到,如果一个集合“被释放”,它也会释放它的所有对象。另一方面,我还读到,一旦集合被释放,集合就会释放它的对象。 但最后一件事可能并不总是发生,正如苹果所说。系统决定是否取消分配。在大多数情况
我有一个客户端-服务器应用程序,它使用 WCF 进行通信,并使用 NetDataContractSerializer 序列化对象图。 由于服务器和客户端之间传输了大量数据,因此我尝试通过微调数据成员的
我需要有关 JMS 队列和消息处理的帮助。 我有一个场景,需要针对特定属性组同步处理消息,但可以在不同属性组之间同时处理消息。 我了解了特定于每个属性的消息组和队列的一些知识。我的想法是,我想针对
我最近开始使用 C++,并且有一种强烈的冲动 #define print(msg) std::cout void print(T const& msg) { std::cout void
我已经为使用 JGroups 编写了简单的测试。有两个像这样的简单应用程序 import org.jgroups.*; import org.jgroups.conf.ConfiguratorFact
这个问题在这里已经有了答案: Firebase messaging is not supported in your browser how to solve this? (3 个回答) 7 个月前关
在我的 C# 控制台应用程序中,我正在尝试更新 CRM 2016 中的帐户。IsFaulted 不断返回 true。当我向下钻取时它返回的错误消息如下: EntityState must be set
我正在尝试通过 tcp 将以下 json 写入 graylog 服务器: {"facility":"GELF","file":"","full_message":"Test Message Tcp",
我正在使用 Django 的消息框架来指示成功的操作和失败的操作。 如何排除帐户登录和注销消息?目前,登录后登陆页面显示 已成功登录为“用户名”。我不希望显示此消息,但应显示所有其他成功消息。我的尝试
我通过编写禁用qDebug()消息 CONFIG(release, debug|release):DEFINES += QT_NO_DEBUG_OUTPUT 在.pro文件中。这很好。我想知道是否可以
我正在使用 ThrottleRequest 来限制登录尝试。 在 Kendler.php 我有 'throttle' => \Illuminate\Routing\Middleware\Throttl
我有一个脚本,它通过die引发异常。捕获异常时,我想输出不附加位置信息的消息。 该脚本: #! /usr/bin/perl -w use strict; eval { die "My erro
允许的消息类型有哪些(字符串、字节、整数等)? 消息的最大大小是多少? 队列和交换器的最大数量是多少? 最佳答案 理论上任何东西都可以作为消息存储/发送。实际上您不想在队列上存储任何内容。如果队列大部
基本上,我正在尝试创建一个简单的 GUI 来与 Robocopy 一起使用。我正在使用进程打开 Robocopy 并将输出重定向到文本框,如下所示: With MyProcess.StartI
我想将进入 MQ 队列的消息记录到数据库/文件或其他日志队列,并且我无法修改现有代码。是否有任何方法可以实现某种类似于 HTTP 嗅探器的消息记录实用程序?或者也许 MQ 有一些内置的功能来记录消息?
我得到了一个带有 single_selection 数据表和一个命令按钮的页面。命令按钮调用一个 bean 方法来验证是否进行了选择。如果不是,它应该显示一条消息警告用户。如果进行了选择,它将导航到另
我知道 MSVC 可以通过 pragma 消息做到这一点 -> http://support.microsoft.com/kb/155196 gcc 是否有办法打印用户创建的警告或消息? (我找不到谷
当存在大量节点或二进制数据时, native Erlang 消息能否提供合理的性能? 情况 1:有一个大约 50-200 台机器的动态池(erlang 节点)。它在不断变化,每 10 分钟大约添加或删
我想知道如何在用户登录后显示“欢迎用户,您已登录”的问候消息,并且该消息应在 5 秒内消失。 该消息将在用户成功登录后显示一次,但在同一 session 期间连续访问主页时不会再次显示。因为我在 ho
如果我仅使用Welcome消息,我的代码可以正常工作,但是当打印p->client_name指针时,消息不居中。 所以我的问题是如何将消息和客户端名称居中,就像它是一条消息一样。为什么它目前仅将消
我是一名优秀的程序员,十分优秀!