- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
RocketMQ在集群模式下,同一个消费组内,一个消息队列同一时间只能分配给组内的某一个消费者,也就是一条消息只能被组内的一个消费者进行消费,为了合理的对消息队列进行分配,于是就有了负载均衡.
接下来以集群模式下的消息推模式 DefaultMQPushConsumerImpl 为例,看一下负载均衡的过程.
首先,消费者在启动时会做如下操作:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public synchronized void start() throws MQClientException {
// ...
// 更新当前消费者订阅主题的路由信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
// 向Broker发送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 唤醒负载均衡服务
this.mQClientFactory.rebalanceImmediately();
}
}
为了保证消费者拿到的主题路由信息是最新的(topic下有几个消息队列、消息队列的分布信息等),在进行负载均衡之前首先要更新主题的路由信息,在 updateTopicSubscribeInfoWhenSubscriptionChanged 方法中可以看到,首先获取了当前消费者订阅的所有主题信息(一个消费者可以订阅多个主题),然后进行遍历,向NameServer发送请求,更新每一个主题的路由信息,保证路由信息是最新的:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
// 获取当前消费者订阅的主题信息
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
// 遍历订阅的主题信息
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
// 从NameServer更新主题的路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
}
}
}
}
由于Broker需要感知消费者数量的增减,所以每个消费者在启动的时候,会调用 sendHeartbeatToAllBrokerWithLock 向Broker发送心跳包,进行消费者注册:
public class MQClientInstance {
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
// 调用sendHeartbeatToAllBroker向Broker发送心跳
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed. [{}]", this.clientId);
}
}
}
在 sendHeartbeatToAllBroker 方法中,可以看到从 brokerAddrTable 中获取了所有的Broker进行遍历(主从模式下也会向从节点发送请求注册),调用 MQClientAPIImpl 的 sendHearbeat 方法向每一个Broker发送心跳请求进行注册:
public class MQClientInstance {
// Broker路由表
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>();
// 发送心跳
private void sendHeartbeatToAllBroker() {
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
// ...
if (!this.brokerAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
// 获取所有的Broker进行遍历, key为 Broker Name, value为同一个name下的所有Broker实例(主从模式下Broker的name一致)
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey(); // broker name
// 获取同一个Broker Name下的所有Broker实例
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
// 遍历所有的实例
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) { // 如果地址不为空
// ...
try {
// 发送心跳
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
// ...
} catch (Exception e) {
// ...
}
}
}
}
}
}
}
}
在 MQClientAPIImpl 的 sendHearbeat 方法中,可以看到构建了 HEART_BEAT 请求,然后向Broker发送:
public class MQClientAPIImpl {
public int sendHearbeat(final String addr, final HeartbeatData heartbeatData, final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
// 创建HEART_BEAT请求
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
request.setLanguage(clientConfig.getLanguage());
request.setBody(heartbeatData.encode());
// 发送请求
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
// ...
}
}
Broker在启动时注册了 HEART_BEAT 请求的处理器,可以看到请求处理器是 ClientManageProcessor :
public class BrokerController {
public void registerProcessor() {
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
// 注册HEART_BEAT请求的处理器ClientManageProcessor
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
}
}
进入到 ClientManageProcessor 的 processRequest 方法,如果请求是 HEART_BEAT 类型会调用 heartBeat 方法进行处理,这里也能看还有 UNREGISTER_CLIENT 类型的请求,从名字上可以看出是与取消注册有关的(这个稍后再说):
public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.HEART_BEAT: // 处理心跳请求
return this.heartBeat(ctx, request);
case RequestCode.UNREGISTER_CLIENT: // 取消注册请求
return this.unregisterClient(ctx, request);
case RequestCode.CHECK_CLIENT_CONFIG:
return this.checkClientConfig(ctx, request);
default:
break;
}
return null;
}
}
进入到 heartBeat 方法,可以看到,调用了 ConsumerManager 的 registerConsumer 注册消费者:
public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
// ...
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
// ...
// 注册Consumer
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(),
data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable);
// ...
}
// ...
return response;
}
}
ConsumerManager 的 registerConsumer 方法的理逻辑如下:
ConsumerGroupInfo
对象。如果获取为空,会创建一个ConsumerGroupInfo,记录了消费者组的相关信息; CHANGE
变更事件(这个稍后再看); REGISTER
注册事件;
public class ConsumerManager {
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// 根据组名称获取消费者组信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) { // 如果为空新增ConsumerGroupInfo对象
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
// 如果有变更
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
// 通知变更
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
// 注册Consumer
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
}
进入到 DefaultConsumerIdsChangeListener 的 handle 方法中,可以看到如果是 REGISTER 事件,会通过 ConsumerFilterManager 的 register 方法进行注册,注册的详细过程这里先不展开讲解:
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
if (event == null) {
return;
}
switch (event) {
case CHANGE:// 如果是消费者变更事件
// ...
break;
case UNREGISTER: // 如果是取消注册事件
this.brokerController.getConsumerFilterManager().unRegister(group);
break;
case REGISTER: // 如果是注册事件
if (args == null || args.length < 1) {
return;
}
Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
// 进行注册
this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
break;
default:
throw new RuntimeException("Unknown event " + event);
}
}
}
经过以上步骤之后,会调用MQClientInstance的 rebalanceImmediately 唤醒负载均衡服务进行一次负载均衡,为消费者分配消息队列, 需要注意的是负载均衡是由消费者端执行 :
// MQClientInstance
public class MQClientInstance {
private final RebalanceService rebalanceService;
public void rebalanceImmediately() {
// 唤醒负载均衡服务
this.rebalanceService.wakeup();
}
}
// RebalanceService
public class RebalanceService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
// 负载均衡
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
负载均衡的过程在 【RocketMQ】消息的拉取 一文中已经讲解,这里挑一些重点内容看一下.
在负载均衡的时候, 首先会获取当前消费者订阅的主题信息,对订阅的主题进行遍历,对每一个主题进行负载均衡 ,重新分配:
public abstract class RebalanceImpl {
public void doRebalance(final boolean isOrder) {
// 获取订阅的主题信息
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
// 遍历所有订阅的主题
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
// 根据主题进行负载均衡
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
}
rebalanceByTopic 方法中根据消费模式进行了判断然后对主题进行负载均衡,这里我们关注集群模式下的负载均衡:
从 topicSubscribeInfoTable 中根据 主题获取对应的消息队列集合,这一步可以得到主题下的所有消息队列信息 ; 。
根据主题信息和消费者组名称,获取 所有订阅了该主题的消费者ID集合,这一步得到了订阅该主题的所有消费者 ; 。
如果主题对应的消息队列集合和消费者ID都不为空, 对消息队列集合和消费ID集合进行排序,排序是为了接下来进行分配; 。
获取设置的分配策略,根据分配策略, 为消费者分配对应的消费队列 ,以平均分配策略为例,它会根据消息队列的数量和消费者的个数计算每个消费者分配的队列个数:
根据最新分配的消息队列信息,调用 updateProcessQueueTableInRebalance 更新当前消费者消费的处理队列 ProcessQueue 信息 。
public abstract class RebalanceImpl {
// 根据主题进行负载均衡
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: { // 广播模式
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// ...
break;
}
case CLUSTERING: { // 集群模式
// 根据主题获取订阅的消息队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 获取所有订阅了该主题的消费者id
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
// ...
if (mqSet != null && cidAll != null) { // 如果都不为空
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 对消息队列排序
Collections.sort(mqAll);
// 对消费者排序
Collections.sort(cidAll);
// 获取分配策略
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 根据分配策略,为消费者分配消费队列
allocateResult = strategy.allocate(
this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
} catch (Throwable e) {
// ...
}
// 分配给当前消费的消费队列
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
// 将分配结果加入到结果集合中
allocateResultSet.addAll(allocateResult);
}
// 根据分配信息更新处理队列
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
// ...
}
break;
}
default:
break;
}
}
}
负载均衡之后,消费者负责的消息队列有可能发生变化,一个消息队列 MessageQueue 对应一个处理队列 ProcessQueue , processQueueTable 记录了消费者负责的队列信息,此时需要对其进行更新,处理逻辑如下:
对 processQueueTable 进行遍历,处理每一个消息队列,这一步主要是判断重新分配之后, processQueueTable 中记录的某些消息队列是否已经不再由当前消费者负责,如果是需要将消息队列置为dropped,表示删除 ,之后消费者不再从此消费队列中拉取消息; 。
判断是否有新分配给当前消费者的消息队列 ,如果某个消息队列在最新分配给当前消费者的消息队列集合 mqSet 中,但是不在 processQueueTable 中, 。
中,进行以下处理:
processQueueTable
PullRequest
,设置消息的拉取信息,并加入到拉取消息请求集合 pullRequestList
中 经过这一步, 如果分配给当前消费者的消费队列不在 processQueueTable 中,就会构建拉取请求 PullRequest ,然后调用dispatchPullRequest处理消息拉取请求,之后会从该消息队列拉取消息 ,详细过程可参考 【RocketMQ】消息的拉取 .
public abstract class RebalanceImpl {
// 处理队列表,KEY为消息队列,VALUE为对应的处理信息
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
// 负载均衡,topic表示当前要进行负载均衡的主题,mqSet中记录了重新分配给当前消费者的消息队列
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
// 处理队列表
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
// 获取消息队列
MessageQueue mq = next.getKey();
// 获取处理队列
ProcessQueue pq = next.getValue();
// 主题是否一致
if (mq.getTopic().equals(topic)) {
// 如果队列集合中不包含当前的队列
if (!mqSet.contains(mq)) {
// 设置为dropped
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) { // 是否过期
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true); // 设置为删除
// ...
break;
default:
break;
}
}
}
}
// 创建拉取请求集合
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
// 遍历本次分配的消息队列集合
for (MessageQueue mq : mqSet) {
// 如果之前不在processQueueTable中
if (!this.processQueueTable.containsKey(mq)) {
// ...
// 创建ProcessQueue
ProcessQueue pq = new ProcessQueue();
long nextOffset = -1L;
try {
// 计算消息拉取偏移量
nextOffset = this.computePullFromWhereWithException(mq);
} catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
// 如果偏移量大于等于0
if (nextOffset >= 0) {
// 放入处理队列表中
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
// 如果之前已经存在,不需要进行处理
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
// 如果之前不存在,构建PullRequest,之后会加入到阻塞队列中,进行消息拉取
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);// 设置消费组
pullRequest.setNextOffset(nextOffset);// 设置拉取偏移量
pullRequest.setMessageQueue(mq);// 设置消息队列
pullRequest.setProcessQueue(pq);// 设置处理队列
pullRequestList.add(pullRequest);// 加入到拉取消息请求集合
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 添加消息拉取请求
this.dispatchPullRequest(pullRequestList);
return changed;
}
}
在文章开头已经讲过,消费者在启动时会进行一次负载均衡,这里便不再赘述.
在消费者注册时讲到,如果发现消费者有变更会触发变更事件,当处于以下两种情况之一时会被判断为消费者发生了变化,需要进行负载均衡:
当前注册的消费者对应的Channel对象之前不存在; 。
当前注册的消费者订阅的主题信息发生了变化,也就是消费者订阅的主题有新增或者删除; 。
public class ConsumerManager {
/**
* 注册消费者
* @param group 消费者组名称
* @param clientChannelInfo 注册的消费者对应的Channel信息
* @param consumeType 消费类型
* @param messageModel
* @param consumeFromWhere 消费消息的位置
* @param subList 消费者订阅的主题信息
* @param isNotifyConsumerIdsChangedEnable 是否通知变更
* @return
*/
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// 根据组名称获取消费者组信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) { // 如果为空新增
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
// 更新Channel
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// 更新订阅信息
boolean r2 = consumerGroupInfo.updateSubscription(subList);
// 如果有变更
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
// 通知变更,consumerGroupInfo中存储了该消费者组下的所有消费者的channel
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
// 注册Consumer
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
}
在 updateChannel 方法中,首先将变更状态 updated 初始化为false,然后根据消费者的channel从 channelInfoTable 路由表中获取对应的 ClientChannelInfo 对象:
如果ClientChannelInfo对象获取为空,表示之前不存在该消费者的channel信息,将其加入到路由表中,变更状态置为true,表示消费者有变化; 。
如果获取不为空,判断clientid是否一致,如果不一致更新为最新的channel信息,但是变更状态 updated 不发生变化; 。
也就是说, 如果注册的消费者之前不存在,那么将变更状态置为true,表示消费者数量发生了变化.
// key为消费者对应的channle,value为chanel信息
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
boolean updated = false; // 变更状态初始化为false
this.consumeType = consumeType;
this.messageModel = messageModel;
this.consumeFromWhere = consumeFromWhere;
// 从channelInfoTable中获取对应的Channel信息,
ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
if (null == infoOld) { // 如果为空
// 新增
ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
if (null == prev) { // 如果之前不存在
log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
messageModel, infoNew.toString());
// 变更状态置为true
updated = true;
}
infoOld = infoNew;
} else {
// 如果之前存在,判断clientid是否一致,如果不一致更新为最新的channel
if (!infoOld.getClientId().equals(infoNew.getClientId())) {
log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ", this.groupName, infoOld.toString(), infoNew.toString());
this.channelInfoTable.put(infoNew.getChannel(), infoNew);
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
return updated;
}
updateSubscription 方法中,主要判断了消费的主题订阅信息是否发生了变化,subscriptionTable中记录了之前记录的订阅信息:
如果消费者订阅的主题发生了变化,比如有新增加的主题或者删除了某个主题的订阅,会被判断为主题订阅信息发生了变化.
public class ConsumerGroupInfo {
// 记录了订阅的主题信息,key为topic,value为订阅信息
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>();
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
// 遍历订阅的主题信息
for (SubscriptionData sub : subList) {
//根据主题获取订阅信息
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
// 如果获取为空
if (old == null) {
// 加入到subscriptionTable
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
if (null == prev) {
updated = true; // 变更状态置为true
log.info("subscription changed, add new topic, group: {} {}", this.groupName, sub.toString());
}
} else if (sub.getSubVersion() > old.getSubVersion()) { // 如果版本发生了变化
if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
log.info("subscription changed, group: {} OLD: {} NEW: {}", this.groupName, old.toString(), sub.toString());
}
// 更新为最新的订阅信息
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
// 进行遍历,这一步主要是判断有没有取消订阅的主题
while (it.hasNext()) {
Entry<String, SubscriptionData> next = it.next();
String oldTopic = next.getKey();
boolean exist = false;
// 遍历最新的订阅信息
for (SubscriptionData sub : subList) {
// 如果在旧的订阅信息中存在就终止,继续判断下一个主题
if (sub.getTopic().equals(oldTopic)) {
exist = true;
break;
}
}
// 走到这里,表示有取消订阅的主题
if (!exist) {
log.warn("subscription changed, group: {} remove topic {} {}",this.groupName, oldTopic, next.getValue().toString());
// 进行删除
it.remove();
// 变更状态置为true
updated = true;
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
return updated;
}
}
上面讲解了两种被判定为消费者发生变化的情况,被判定为变化之后,会触调用 DefaultConsumerIdsChangeListener 中的 handle 方法触发变更事件,在方法中传入了消费者组下的所有消费者的channel对象,会发送变更请求通知该消费者组下的所有消费者,进行负载均衡.
DefaultConsumerIdsChangeListener 中处理变更事件时,会对消费组下的所有消费者遍历,调用 notifyConsumerIdsChanged 方法向每一个消费者发送变更请求
public class ConsumerManager {
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// ...
// 更新Channel
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// 更新订阅信息
boolean r2 = consumerGroupInfo.updateSubscription(subList);
// 如果有变更
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
// 触发变更事件,consumerGroupInfo中存储了该消费者组下的所有消费者的channel
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
// ...
}
}
// DefaultConsumerIdsChangeListener
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
if (event == null) {
return;
}
switch (event) {
case CHANGE:// 如果是消费者变更事件
case CHANGE:
if (args == null || args.length < 1) {
return;
}
// 获取所有的消费者对应的channel
List<Channel> channels = (List<Channel>) args[0];
if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
for (Channel chl : channels) {
// 向每一个消费者发送变更请求
this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
}
}
break;
// ...
}
}
}
请求的发送是在 Broker2Client 的 notifyConsumerIdsChanged 方法中实现的,可以看到会创建 NOTIFY_CONSUMER_IDS_CHANGED 请求并发送:
public class Broker2Client {
public void notifyConsumerIdsChanged(
final Channel channel,
final String consumerGroup) {
if (null == consumerGroup) {
log.error("notifyConsumerIdsChanged consumerGroup is null");
return;
}
NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
// 创建变更通知请求
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
try {
// 发送请求
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error("notifyConsumerIdsChanged exception. group={}, error={}", consumerGroup, e.toString());
}
}
}
消费者对Broker发送的 NOTIFY_CONSUMER_IDS_CHANGED 的请求处理在 ClientRemotingProcessor 的 processRequest 方法中,它会调用 notifyConsumerIdsChanged 方法进行处理,在 notifyConsumerIdsChanged 方法中可以看到触发了一次负载均衡:
public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED: // NOTIFY_CONSUMER_IDS_CHANGED请求处理
// 处理变更请求
return this.notifyConsumerIdsChanged(ctx, request);
// ...
default:
break;
}
return null;
}
public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
try {
// ...
// 触发负载均衡
this.mqClientFactory.rebalanceImmediately();
} catch (Exception e) {
log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));
}
return null;
}
}
消费者在停止时,需要将当前消费者负责的消息队列分配给其他消费者进行消费,所以在 shutdown 方法中会调用 MQClientInstance 的 unregisterConsumer 方法取消注册:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public synchronized void shutdown(long awaitTerminateMillis) {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.consumeMessageService.shutdown(awaitTerminateMillis);
this.persistConsumerOffset();
// 取消注册
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
// ...
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
}
在 unregisterConsumer 方法中,又调用了 unregisterClient 方法取消注册,与注册消费者的逻辑相似,它会向所有的Broker发送取消注册的请求:
public class MQClientInstance {
public synchronized void unregisterConsumer(final String group) {
this.consumerTable.remove(group);
// 取消注册
this.unregisterClient(null, group);
}
private void unregisterClient(final String producerGroup, final String consumerGroup) {
// 获取所有的Broker
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
// 进行遍历
while (it.hasNext()) {
// ...
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
String addr = entry1.getValue();
if (addr != null) {
try {
// 发送取消注册请求
this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, clientConfig.getMqClientApiTimeout());
// ...
} // ...
}
}
}
}
}
}
取消注册请求的发送是在 MQClientAPIImpl 中 unregisterClient 方法实现的,可以看到构建了 UNREGISTER_CLIENT 请求并发送:
public class MQClientAPIImpl {
public void unregisterClient(final String addr, final String clientID, final String producerGroup, final String consumerGroup, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
// ...
requestHeader.setConsumerGroup(consumerGroup);
// 构建UNREGISTER_CLIENT请求
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
// 发送请求
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
// ...
}
}
与注册消费者的请求处理一样,Broker对 UNREGISTER_CLIENT 的请求同样是在 ClientManageProcessor 的 processRequest 中处理的,对于 UNREGISTER_CLIENT 请求是调用unregisterClient方法处理的,里面又调用了 ConsumerManager 的 unregisterConsumer 方法进行取消注册:
public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.HEART_BEAT: // 处理心跳请求
return this.heartBeat(ctx, request);
case RequestCode.UNREGISTER_CLIENT: // 取消注册请求
return this.unregisterClient(ctx, request);
case RequestCode.CHECK_CLIENT_CONFIG:
return this.checkClientConfig(ctx, request);
default:
break;
}
return null;
}
public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
// ...
{
final String group = requestHeader.getConsumerGroup();
if (group != null) {
// ...
// 取消消费者的注册
this.brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable);
}
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
}
在 ConsumerManager 的 unregisterConsumer 方法中,可以看到触发了取消注册事件,之后如果开启了允许通知变更,会触发变更事件,变更事件在上面已经讲解过,它会通知消费者组下的所有消费者进行一次负载均衡:
public class ConsumerManager {
public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
boolean isNotifyConsumerIdsChangedEnable) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null != consumerGroupInfo) {
consumerGroupInfo.unregisterChannel(clientChannelInfo);
if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(group);
if (remove != null) {
// 触发取消注册事件
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
}
}
// 触发消费者变更事件
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
}
}
在 RebalanceService 的run方法中,可以看到设置了等待时间,默认是20s,所以消费者本身也会定时执行负载均衡:
public class RebalanceService extends ServiceThread {
private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval); // 等待
// 负载均衡
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
总结 。
参考 。
田守枝-深入理解RocketMQ Rebalance机制 。
RocketMQ版本:4.9.3 。
最后此篇关于【RocketMQ】负载均衡源码分析的文章就讲到这里了,如果你想了解更多关于【RocketMQ】负载均衡源码分析的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
在 Web 应用程序架构设计期间,我必须从概念上计算我的服务器之一可以服务多少个当前客户端。然后我可以预算它。 那么,有什么公式可以遵循吗?或者,你如何计算这个?或者,通常,一个 httpd/tomc
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 要求我们推荐或查找书籍、工具、软件库、教程或其他场外资源的问题对于 Stack Overflow 来说是
我正在使用 Angular 5,我正在尝试在加载 div 的背景图像时获取加载图标。 如果它是一个普通的 img,我对此没有问题,但如果我尝试将它作为背景,它就不起作用。 这里是一些示例代码 app.
我们怎么知道我们的程序在 CPU 上有多少负载? 我尝试使用 htop 找到它。但是 htop 不会给 cpu 负载。它实际上给出了我程序的 cpu 利用率(使用 pid)。 我正在使用 C 编程,L
我们发现从Spark 1.3到当前的Spark 2.0.1以来,从Oracle数据库使用Spark的API加载数据一直很慢。典型的代码在Java中是这样的: Map options =
我有时会收到 mnesia overloaded主要使用时的错误消息 async_dirty查询和 ram_copies表。所以为了了解发生了什么,我想获得更多关于 mnesia 状态的信息,例如每秒
对于通常使用很少 CPU 的程序来说,内核 CPU 非常高。 Linux 机器在状态之间交替。大多数时候,程序使用低 CPU 正常执行。在 CPU“激增”期间,程序使用 100% 可用 CPU 使用高
我正在使用 Raspberry Pi 2 来路由 wifi-eth 连接。因此,从 eth 方面来看,我有一台可以使用 Pi wifi 连接到互联网的计算机。在 Raspberry 上我启动 htop
基本上我有一个网页,其中有一个 iframe 可以从不同的域加载另一个网页。它移动得很慢,我想证明整个页面很慢只是因为 iframe 内的页面。 有什么方法可以测量总页面负载以及总页面负载中有多少%来
我们有一个基于 Spring 的应用程序,它充当使用其他 Rest API 的编排层。我只想测试这个组件的性能,而不测试正在使用的下游 api。 我正在寻找有关如何完成此操作的任何架构建议? 当前的方
我正在学习 hibernate 。为了进行测试,我使用无效 key 调用了 session.load 。当我在调试器(JB Idea)中跨过该行后,没有任何反应 - 我预计会得到 ObjectNotF
我正在开发一个小型的待办事项 PHP 应用程序。我正在使用 jQuery 构建 HTML。其中一个是一个按钮,用于启动一个模式,允许用户编辑该项目。我很好奇加载数据时更好的方法是什么: 1) 在初始加
我尝试在 twitch 播放器中使用 angular 作为覆盖标记。 我将 ng-repear 与(键,值)结合使用。 //player is here 设置是一个全局对象。但是当我尝试加载页面
我即将了解 C 语言中的特定进程如何在特定时间范围内加载 CPU。该进程可能会在运行时切换处理器核心,因此我也需要处理这个问题。 CPU为ARM处理器。 我研究了从标准顶部获取负载的不同方法,perf
这个问题在这里已经有了答案: XMLHttpRequest Origin null is not allowed Access-Control-Allow-Origin for file:/// t
您好,我正在用 Java 开发负载平衡算法。在我的系统中将有一个主节点和 n 个从节点。主节点将接收查询分发给它的从节点。但是在将查询分发到其从节点之一之前,我想测量从节点中的当前负载,以检查特定从节
我正在渲染由大约 50 万个三角形组成的相当重的对象。我使用 opengl 显示列表,在渲染方法中只调用 glCallList。我认为一旦图形基元被编译成显示列表,cpu 的工作就完成了,它只是告诉
我正在尝试加密 Sipdroid,为此我必须在 RTP 数据包获得编码的音频负载后对其进行加密。我在 RTP 数据包类中使用这个函数: public byte[] getPayload() {
我正在尝试解析以下 JSON 负载: { "results":[ [ 298.648132, 280.68692, 356.54
在动画期间 cpu 负载非常高(高达 75%) 是否有优化代码以降低 CPU 负载的方法? 我的代码: ImageView myImageView = (ImageView)findViewById(
我是一名优秀的程序员,十分优秀!