- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
在 【RocketMQ】消息的拉取 一文中可知,消费者在启动的时候,会创建消息拉取API对象 PullAPIWrapper ,调用pullKernelImpl方法向Broker发送拉取消息的请求,那么在主从模式下消费者是如何选择向哪个Broker发送拉取请求的? 进入pullKernelImpl方法中,可以看到会 调用recalculatePullFromWhichNode方法选择一个Broker :
public class PullAPIWrapper {
public PullResult pullKernelImpl(
final MessageQueue mq,
final String subExpression,
final String expressionType,
final long subVersion,
final long offset,
final int maxNums,
final int sysFlag,
final long commitOffset,
final long brokerSuspendMaxTimeMillis,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 调用recalculatePullFromWhichNode方法获取Broker ID,再调用findBrokerAddressInSubscribe根据ID获取Broker的相关信息
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
// ...
if (findBrokerResult != null) {
// ...
// 获取Broker地址
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
// 发送消息拉取请求
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
return pullResult;
}
}
}
在 recalculatePullFromWhichNode 方法中,会从 pullFromWhichNodeTable 中根据消息队列获取一个建议的Broker ID,如果获取为空就返回Master节点的Broker ID,ROCKETMQ中Master角色的Broker ID为0,既然从 pullFromWhichNodeTable 中可以知道从哪个Broker拉取数据,那么 pullFromWhichNodeTable 中的数据又是从哪里来的?
public class PullAPIWrapper {
// KEY为消息队列,VALUE为建议的Broker ID
private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
public long recalculatePullFromWhichNode(final MessageQueue mq) {
if (this.isConnectBrokerByUser()) {
return this.defaultBrokerId;
}
// 从pullFromWhichNodeTable中获取建议的broker ID
AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
if (suggest != null) {
return suggest.get();
}
// 返回Master Broker ID
return MixAll.MASTER_ID;
}
}
通过调用关系可知,在 updatePullFromWhichNode 方法中更新了 pullFromWhichNodeTable 的值,而 updatePullFromWhichNode 方法又是被 processPullResult 方法调用的,消费者向Broker发送拉取消息请求后,Broker对拉取请求进行处理时会设置一个broker ID(后面会讲到),建议下次从这个Broker拉取消息,消费者对拉取请求返回的响应数据进行处理时会调用 processPullResult 方法,在这里将建议的BrokerID取出,调用 updatePullFromWhichNode 方法将其加入到了 pullFromWhichNodeTable 中:
public class PullAPIWrapper {
private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
// 将拉取消息请求返回的建议Broker ID,加入到pullFromWhichNodeTable中
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
// ...
}
public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
if (null == suggest) {
// 向pullFromWhichNodeTable中添加数据
this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
} else {
suggest.set(brokerId);
}
}
}
接下来去看下是根据什么条件决定选择哪个Broker的.
Broker在处理消费者拉取请求时,会调用 PullMessageProcessor 的 processRequest 方法,首先会调用 MessageStore 的getMessage方法获取消息内容,在返回的结果 GetMessageResult 中设置了一个是否建议从Slave节点拉取的属性(这个值的设置稍后再说),会根据是否建议从slave节点进行以下处理:
subscriptionGroupConfig
订阅分组配置的 getWhichBrokerWhenConsumeSlowly
方法获取从节点将ID设置到响应中,否则下次依旧建议从主节点拉取消息,将MASTER节点的ID设置到响应中; 订阅分组配置 mqadmin命令的 -i 参数可以指定从哪个Broker消费消息( subscriptionGroupConfig 的 getBrokerId 返回的值), -w 参数可以指定建议从slave节点消费的时候,从哪个slave消费( subscriptionGroupConfig 的 getWhichBrokerWhenConsumeSlowly 方法返回的值):
usage: mqadmin updateSubGroup [-a <arg>] [-b <arg>] [-c <arg>] [-d <arg>] -g <arg> [-h] [-i <arg>] [-m <arg>]
[-n <arg>] [-q <arg>] [-r <arg>] [-s <arg>] [-w <arg>]
-i,--brokerId <arg> consumer from which broker id
-w,--whichBrokerWhenConsumeSlowly <arg> which broker id when consume slowly
public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
// ...
// 根据拉取偏移量获取消息
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
if (getMessageResult != null) {
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
// 是否建议从从节点拉取消息
if (getMessageResult.isSuggestPullingFromSlave()) {
// 选择一个从节点
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
// 判断Broker的角色
switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
break;
case SLAVE:
// 如果不允许从从节点读取数据,设置为MasterID
if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
break;
}
// 如果开启了允许从从节点读取数据
if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
// 如果建议从从节点拉消息
if (getMessageResult.isSuggestPullingFromSlave()) {
// 获取从节点
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
}
else {
// 获取指定的broker
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
}
} else {
// 使用Master节点
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store getMessage return null");
}
}
}
DefaultMessageStore的getMessage方法中用于获取消息内容,并会根据消费者的拉取进度判断是否建议下次从Slave节点拉取消息,判断过程如下:
public class DefaultMessageStore implements MessageStore {
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {
// ...
// 当前CommitLog的最大偏移量
final long maxOffsetPy = this.commitLog.getMaxOffset();
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
if (maxOffset == 0) {
// ...
} else {
// 根据消费进度获取消息队列
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
// ...
// CommitLog最大偏移量减去本次拉取消息的最大物理偏移量
long diff = maxOffsetPy - maxPhyOffsetPulling;
// 计算消息在PageCache中的总大小(总物理内存 * 消息存储在内存中的阀值/100)
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
// 是否建议下次去从节点拉取消息
getResult.setSuggestPullingFromSlave(diff > memory);
} finally {
bufferConsumeQueue.release();
}
} else {
// ...
}
}
} else {
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}
// ...
return getResult;
}
}
总结 消费者在启动后需要向Broker发送拉取消息的请求,Broker收到请求后会根据消息的拉取进度,返回一个建议的BrokerID,并设置到响应中返回,消费者处理响应时将建议的BrokerID放入pullFromWhichNodeTable,下次拉去消息的时候从pullFromWhichNodeTable中取出,并向其发送请求拉取消息.
上面讲解了主从模式下如何选择从哪个Broker拉取消息,接下来看下消费进度的持久化,因为广播模式下消费进度保存在每个消费者端,集群模式下消费进度保存在Broker端,所以接下来以集群模式为例.
在 【RocketMQ】消息的拉取 一文中可知,集群模式下主要是通过 RemoteBrokerOffsetStore 进行消费进度管理的,在持久化方法 persistAll 中会调用 updateConsumeOffsetToBroker 更新Broker端的消费进度:
public class RemoteBrokerOffsetStore implements OffsetStore {
@Override
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;
final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
if (mqs.contains(mq)) {
try {
// 向Broker发送请求更新消费进度
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
} else {
unusedMQ.add(mq);
}
}
}
// ...
}
}
由于 updateConsumeOffsetToBroker 方法中先调用了 findBrokerAddressInSubscribe 方法获取Broker的信息,所以这里先看 findBrokerAddressInSubscribe 方法是如何选择Broker的,它需要传入三个参数,分别为:Broker名称、Broker ID、是否只查找参数中传入的那个BrokerID,方法的处理逻辑如下:
brokerAddrTable
中根据Broker的名称获取所有的Broker集合(主从模式下他们的Broker名称一致,但是ID不一致),KEY为BrokerID,VALUE为Broker的地址;
public class MQClientInstance {
public FindBrokerResult findBrokerAddressInSubscribe(
final String brokerName, // Broker名称
final long brokerId, // Broker ID
final boolean onlyThisBroker // 是否只查找参数中传入的那个BrokerID
) {
String brokerAddr = null;
boolean slave = false;
boolean found = false;
// 获取所有的Broker ID
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
brokerAddr = map.get(brokerId);
// 是否是从节点
slave = brokerId != MixAll.MASTER_ID;
// 地址是否为空
found = brokerAddr != null;
// 如果地址为空并且是从节点
if (!found && slave) {
// 获取下一个Broker
brokerAddr = map.get(brokerId + 1);
found = brokerAddr != null;
}
// 如果地址为空
if (!found && !onlyThisBroker) {
// 获取集合中的第一个节点
Entry<Long, String> entry = map.entrySet().iterator().next();
// 获取地址
brokerAddr = entry.getValue();
// 是否是从节点
slave = entry.getKey() != MixAll.MASTER_ID;
// 置为true
found = true;
}
}
if (found) {
// 返回数据
return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
}
return null;
}
}
回到updateConsumeOffsetToBroker方法,先看第一次调用findBrokerAddressInSubscribe方法获取Broker信息,传入的三个参数分别为:Broker名称、Master节点的ID、true,根据上面讲解的 findBrokerAddressInSubscribe 方法里面的查找逻辑,如果查找到Master节点的信息,就正常返回,如果此时Master宕机未能正常查找到,由于传入的Master节点的ID并且onlyThisBroker置为true,所以会查找失败返回NULL.
如果第一次调用为空,会进行第二次调用,与第一次调用不同的地方是第三个参数置为了false,也就是说不是必须选择参数中指定的那个Broker,此时依旧优先查找Master节点,如果Master节点未查找到,由于onlyThisBroker置为了false,会迭代集合选择第一个节点返回,此时返回的有可能是从节点.
总结:消费者会优先选择向主节点发送请求进行消费进度保存,假如主节点宕机等原因未能获取到主节点的信息,会迭代集合选择第一个节点返回,所以消费者也可以向从节点发送请求进行进度保存,待主节点恢复后,依旧优先选择主节点.
public class RemoteBrokerOffsetStore implements OffsetStore {
private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
// 更新消费进度
updateConsumeOffsetToBroker(mq, offset, true);
}
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
// 第一次调用findBrokerAddressInSubscribe方法获取Broker信息,三个参数分别为:Broker名称、Master节点的ID、true
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
// 如果获取为空,进行第二次调用
if (null == findBrokerResult) {
// 三个参数分别为:Broker名称、Master节点的ID、false
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
}
if (findBrokerResult != null) {
// 设置请求头
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
// 发送保存消费进度的请求
if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
}
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
}
BrokerController 在构造函数中,实例化了 SlaveSynchronize ,并在start方法中调用了 handleSlaveSynchronize 方法处理从节点的数据同步, 如果当前的Broker是从节点,会注册定时任务,定时调用 SlaveSynchronize 的syncAll方法进行数据同步:
public class BrokerController {
private final SlaveSynchronize slaveSynchronize;
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
) {
// ...
this.slaveSynchronize = new SlaveSynchronize(this);
//...
}
public void start() throws Exception {
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
// 处理从节点的同步
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
this.registerBrokerAll(true, false, true);
}
}
private void handleSlaveSynchronize(BrokerRole role) {
// 如果是SLAVE节点
if (role == BrokerRole.SLAVE) {
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
// 设置定时任务,定时进行数据同步
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 同步数据
BrokerController.this.slaveSynchronize.syncAll();
}
catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
} else {
//handle the slave synchronise
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
}
}
}
在SlaveSynchronize的 syncAll 方法中,又调用了 syncConsumerOffset 方法同步消费进度:
public class SlaveSynchronize {
public void syncAll() {
this.syncTopicConfig();
// 同步消费进度
this.syncConsumerOffset();
this.syncDelayOffset();
this.syncSubscriptionGroupConfig();
}
private void syncConsumerOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
// 向主节点发送请求获取消费进度信息
ConsumerOffsetSerializeWrapper offsetWrapper =
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
// 设置数据
this.brokerController.getConsumerOffsetManager().getOffsetTable()
.putAll(offsetWrapper.getOffsetTable());
// 将获取到的消费进度数据进行持久化
this.brokerController.getConsumerOffsetManager().persist();
log.info("Update slave consumer offset from master, {}", masterAddrBak);
} catch (Exception e) {
log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
}
}
}
}
参考 丁威、周继锋《RocketMQ技术内幕》 RocketMQ 主从同步若干问题答疑 RocketMq 订阅分组创建和删除 。
RocketMQ版本:4.9.3 。
最后此篇关于【RocketMQ】主从模式下的消费进度管理的文章就讲到这里了,如果你想了解更多关于【RocketMQ】主从模式下的消费进度管理的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
什么是RocketMQ RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。主要功能是异步解耦和流量削峰:。 常见的MQ主要有
1、说说你们公司线上生产环境用的是什么消息中间件? 见【2、多个mq如何选型?】 2、多个mq如何选型? MQ 描述 Rabb
消费者从Broker拉取到消息之后,会将消息提交到线程池中进行消费,RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求 ConsumeReques
全局有序 在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一
RocketMQ支持集群部署来保证高可用。它基于主从模式,将节点分为Master、Slave两个角色,集群中可以有多个Master节点,一个Master节点可以有多个Slave节点。Master节点
RocketMQ 4.5版本之前,可以采用主从架构进行集群部署,但是如果master节点挂掉,不能自动在集群中选举出新的Master节点,需要人工介入,在4.5版本之后提供了DLedger模式,使用
RocketMQ是通过 DefaultMQProducer 进行消息发送的,它实现了 MQProducer 接口, MQProducer 接口中定义了消息发送的方法,方法主要分为三大类:
当Broker收到生产者的消息发送请求时,会对请求进行处理,从请求中解析发送的消息数据,接下来以单个消息的接收为例,看一下消息的接收过程。 数据校验 封装消息 首先Broker会创
在上一讲中,介绍了消息的存储,生产者向Broker发送消息之后,数据会写入到CommitLog中,这一讲,就来看一下消费者是如何从Broker拉取消息的。 RocketMQ消息的消费以组为单位
NameServer是一个注册中心,提供服务注册和服务发现的功能。NameServer可以集群部署,集群中每个节点都是对等的关系(没有像ZooKeeper那样在集群中选举出一个Master节点),节
RocketMQ 4.5版本之前,可以采用主从架构进行集群部署,但是如果master节点挂掉,不能自动在集群中选举出新的Master节点,需要人工介入,在4.5版本之后提供了DLedger模式,使用
消息存储 在 【RocketMQ】消息的存储 一文中提到,Broker收到消息后会调用 CommitLog 的asyncPutMessage方法写入消息,在DLedger模式下使用的是
RocketMQ在集群模式下,同一个消费组内,一个消息队列同一时间只能分配给组内的某一个消费者,也就是一条消息只能被组内的一个消费者进行消费,为了合理的对消息队列进行分配,于是就有了负载均衡.
RocketMQ有两种获取消息的方式,分别为推模式和拉模式。 推模式 推模式在 【RocketMQ】消息的拉取 一文中已经讲过,虽然从名字上看起来是消息到达Broker后推送给消费者
主从同步的实现逻辑主要在 HAService 中,在 DefaultMessageStore 的构造函数中,对 HAService 进行了实例化,并在start方法中,启动了 HAService :
在 【RocketMQ】消息的拉取 一文中可知,消费者在启动的时候,会创建消息拉取API对象 PullAPIWrapper ,调用pullKernelImpl方法向Broker发送拉取消息的
全局有序 在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一
概述 RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息; 预设值的延迟时间间隔为: 1s、 5s、 10s、 30s、 1m
RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等。 其中,level=0 级表示不延时,level=1 表示 1 级延时,le
我尝试给 rockerMQ broker 加注星标,但我收到了错误消息: There is insufficient memory for the Java Runtime Environment t
我是一名优秀的程序员,十分优秀!