- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
RocketMQ在集群模式下,同一个消费组内,一个消息队列同一时间只能分配给组内的某一个消费者,也就是一条消息只能被组内的一个消费者进行消费,为了合理的对消息队列进行分配,于是就有了负载均衡.
接下来以集群模式下的消息推模式 DefaultMQPushConsumerImpl 为例,看一下负载均衡的过程.
首先,消费者在启动时会做如下操作:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public synchronized void start() throws MQClientException {
// ...
// 更新当前消费者订阅主题的路由信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
// 向Broker发送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 唤醒负载均衡服务
this.mQClientFactory.rebalanceImmediately();
}
}
为了保证消费者拿到的主题路由信息是最新的(topic下有几个消息队列、消息队列的分布信息等),在进行负载均衡之前首先要更新主题的路由信息,在 updateTopicSubscribeInfoWhenSubscriptionChanged 方法中可以看到,首先获取了当前消费者订阅的所有主题信息(一个消费者可以订阅多个主题),然后进行遍历,向NameServer发送请求,更新每一个主题的路由信息,保证路由信息是最新的:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
// 获取当前消费者订阅的主题信息
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
// 遍历订阅的主题信息
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
// 从NameServer更新主题的路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
}
}
}
}
由于Broker需要感知消费者数量的增减,所以每个消费者在启动的时候,会调用 sendHeartbeatToAllBrokerWithLock 向Broker发送心跳包,进行消费者注册:
public class MQClientInstance {
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
// 调用sendHeartbeatToAllBroker向Broker发送心跳
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed. [{}]", this.clientId);
}
}
}
在 sendHeartbeatToAllBroker 方法中,可以看到从 brokerAddrTable 中获取了所有的Broker进行遍历(主从模式下也会向从节点发送请求注册),调用 MQClientAPIImpl 的 sendHearbeat 方法向每一个Broker发送心跳请求进行注册:
public class MQClientInstance {
// Broker路由表
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>();
// 发送心跳
private void sendHeartbeatToAllBroker() {
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
// ...
if (!this.brokerAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
// 获取所有的Broker进行遍历, key为 Broker Name, value为同一个name下的所有Broker实例(主从模式下Broker的name一致)
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey(); // broker name
// 获取同一个Broker Name下的所有Broker实例
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
// 遍历所有的实例
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) { // 如果地址不为空
// ...
try {
// 发送心跳
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
// ...
} catch (Exception e) {
// ...
}
}
}
}
}
}
}
}
在 MQClientAPIImpl 的 sendHearbeat 方法中,可以看到构建了 HEART_BEAT 请求,然后向Broker发送:
public class MQClientAPIImpl {
public int sendHearbeat(final String addr, final HeartbeatData heartbeatData, final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
// 创建HEART_BEAT请求
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
request.setLanguage(clientConfig.getLanguage());
request.setBody(heartbeatData.encode());
// 发送请求
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
// ...
}
}
Broker在启动时注册了 HEART_BEAT 请求的处理器,可以看到请求处理器是 ClientManageProcessor :
public class BrokerController {
public void registerProcessor() {
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
// 注册HEART_BEAT请求的处理器ClientManageProcessor
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
}
}
进入到 ClientManageProcessor 的 processRequest 方法,如果请求是 HEART_BEAT 类型会调用 heartBeat 方法进行处理,这里也能看还有 UNREGISTER_CLIENT 类型的请求,从名字上可以看出是与取消注册有关的(这个稍后再说):
public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.HEART_BEAT: // 处理心跳请求
return this.heartBeat(ctx, request);
case RequestCode.UNREGISTER_CLIENT: // 取消注册请求
return this.unregisterClient(ctx, request);
case RequestCode.CHECK_CLIENT_CONFIG:
return this.checkClientConfig(ctx, request);
default:
break;
}
return null;
}
}
进入到 heartBeat 方法,可以看到,调用了 ConsumerManager 的 registerConsumer 注册消费者:
public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
// ...
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
// ...
// 注册Consumer
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(),
data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable);
// ...
}
// ...
return response;
}
}
ConsumerManager 的 registerConsumer 方法的理逻辑如下:
ConsumerGroupInfo
对象。如果获取为空,会创建一个ConsumerGroupInfo,记录了消费者组的相关信息; CHANGE
变更事件(这个稍后再看); REGISTER
注册事件;
public class ConsumerManager {
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// 根据组名称获取消费者组信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) { // 如果为空新增ConsumerGroupInfo对象
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
// 如果有变更
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
// 通知变更
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
// 注册Consumer
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
}
进入到 DefaultConsumerIdsChangeListener 的 handle 方法中,可以看到如果是 REGISTER 事件,会通过 ConsumerFilterManager 的 register 方法进行注册,注册的详细过程这里先不展开讲解:
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
if (event == null) {
return;
}
switch (event) {
case CHANGE:// 如果是消费者变更事件
// ...
break;
case UNREGISTER: // 如果是取消注册事件
this.brokerController.getConsumerFilterManager().unRegister(group);
break;
case REGISTER: // 如果是注册事件
if (args == null || args.length < 1) {
return;
}
Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
// 进行注册
this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
break;
default:
throw new RuntimeException("Unknown event " + event);
}
}
}
经过以上步骤之后,会调用MQClientInstance的 rebalanceImmediately 唤醒负载均衡服务进行一次负载均衡,为消费者分配消息队列, 需要注意的是负载均衡是由消费者端执行 :
// MQClientInstance
public class MQClientInstance {
private final RebalanceService rebalanceService;
public void rebalanceImmediately() {
// 唤醒负载均衡服务
this.rebalanceService.wakeup();
}
}
// RebalanceService
public class RebalanceService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
// 负载均衡
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
负载均衡的过程在 【RocketMQ】消息的拉取 一文中已经讲解,这里挑一些重点内容看一下.
在负载均衡的时候, 首先会获取当前消费者订阅的主题信息,对订阅的主题进行遍历,对每一个主题进行负载均衡 ,重新分配:
public abstract class RebalanceImpl {
public void doRebalance(final boolean isOrder) {
// 获取订阅的主题信息
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
// 遍历所有订阅的主题
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
// 根据主题进行负载均衡
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
}
rebalanceByTopic 方法中根据消费模式进行了判断然后对主题进行负载均衡,这里我们关注集群模式下的负载均衡:
从 topicSubscribeInfoTable 中根据 主题获取对应的消息队列集合,这一步可以得到主题下的所有消息队列信息 ; 。
根据主题信息和消费者组名称,获取 所有订阅了该主题的消费者ID集合,这一步得到了订阅该主题的所有消费者 ; 。
如果主题对应的消息队列集合和消费者ID都不为空, 对消息队列集合和消费ID集合进行排序,排序是为了接下来进行分配; 。
获取设置的分配策略,根据分配策略, 为消费者分配对应的消费队列 ,以平均分配策略为例,它会根据消息队列的数量和消费者的个数计算每个消费者分配的队列个数:
根据最新分配的消息队列信息,调用 updateProcessQueueTableInRebalance 更新当前消费者消费的处理队列 ProcessQueue 信息 。
public abstract class RebalanceImpl {
// 根据主题进行负载均衡
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: { // 广播模式
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// ...
break;
}
case CLUSTERING: { // 集群模式
// 根据主题获取订阅的消息队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 获取所有订阅了该主题的消费者id
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
// ...
if (mqSet != null && cidAll != null) { // 如果都不为空
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 对消息队列排序
Collections.sort(mqAll);
// 对消费者排序
Collections.sort(cidAll);
// 获取分配策略
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 根据分配策略,为消费者分配消费队列
allocateResult = strategy.allocate(
this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
} catch (Throwable e) {
// ...
}
// 分配给当前消费的消费队列
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
// 将分配结果加入到结果集合中
allocateResultSet.addAll(allocateResult);
}
// 根据分配信息更新处理队列
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
// ...
}
break;
}
default:
break;
}
}
}
负载均衡之后,消费者负责的消息队列有可能发生变化,一个消息队列 MessageQueue 对应一个处理队列 ProcessQueue , processQueueTable 记录了消费者负责的队列信息,此时需要对其进行更新,处理逻辑如下:
对 processQueueTable 进行遍历,处理每一个消息队列,这一步主要是判断重新分配之后, processQueueTable 中记录的某些消息队列是否已经不再由当前消费者负责,如果是需要将消息队列置为dropped,表示删除 ,之后消费者不再从此消费队列中拉取消息; 。
判断是否有新分配给当前消费者的消息队列 ,如果某个消息队列在最新分配给当前消费者的消息队列集合 mqSet 中,但是不在 processQueueTable 中, 。
中,进行以下处理:
processQueueTable
PullRequest
,设置消息的拉取信息,并加入到拉取消息请求集合 pullRequestList
中 经过这一步, 如果分配给当前消费者的消费队列不在 processQueueTable 中,就会构建拉取请求 PullRequest ,然后调用dispatchPullRequest处理消息拉取请求,之后会从该消息队列拉取消息 ,详细过程可参考 【RocketMQ】消息的拉取 .
public abstract class RebalanceImpl {
// 处理队列表,KEY为消息队列,VALUE为对应的处理信息
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
// 负载均衡,topic表示当前要进行负载均衡的主题,mqSet中记录了重新分配给当前消费者的消息队列
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
// 处理队列表
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
// 获取消息队列
MessageQueue mq = next.getKey();
// 获取处理队列
ProcessQueue pq = next.getValue();
// 主题是否一致
if (mq.getTopic().equals(topic)) {
// 如果队列集合中不包含当前的队列
if (!mqSet.contains(mq)) {
// 设置为dropped
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) { // 是否过期
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true); // 设置为删除
// ...
break;
default:
break;
}
}
}
}
// 创建拉取请求集合
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
// 遍历本次分配的消息队列集合
for (MessageQueue mq : mqSet) {
// 如果之前不在processQueueTable中
if (!this.processQueueTable.containsKey(mq)) {
// ...
// 创建ProcessQueue
ProcessQueue pq = new ProcessQueue();
long nextOffset = -1L;
try {
// 计算消息拉取偏移量
nextOffset = this.computePullFromWhereWithException(mq);
} catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
// 如果偏移量大于等于0
if (nextOffset >= 0) {
// 放入处理队列表中
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
// 如果之前已经存在,不需要进行处理
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
// 如果之前不存在,构建PullRequest,之后会加入到阻塞队列中,进行消息拉取
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);// 设置消费组
pullRequest.setNextOffset(nextOffset);// 设置拉取偏移量
pullRequest.setMessageQueue(mq);// 设置消息队列
pullRequest.setProcessQueue(pq);// 设置处理队列
pullRequestList.add(pullRequest);// 加入到拉取消息请求集合
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 添加消息拉取请求
this.dispatchPullRequest(pullRequestList);
return changed;
}
}
在文章开头已经讲过,消费者在启动时会进行一次负载均衡,这里便不再赘述.
在消费者注册时讲到,如果发现消费者有变更会触发变更事件,当处于以下两种情况之一时会被判断为消费者发生了变化,需要进行负载均衡:
当前注册的消费者对应的Channel对象之前不存在; 。
当前注册的消费者订阅的主题信息发生了变化,也就是消费者订阅的主题有新增或者删除; 。
public class ConsumerManager {
/**
* 注册消费者
* @param group 消费者组名称
* @param clientChannelInfo 注册的消费者对应的Channel信息
* @param consumeType 消费类型
* @param messageModel
* @param consumeFromWhere 消费消息的位置
* @param subList 消费者订阅的主题信息
* @param isNotifyConsumerIdsChangedEnable 是否通知变更
* @return
*/
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// 根据组名称获取消费者组信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) { // 如果为空新增
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
// 更新Channel
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// 更新订阅信息
boolean r2 = consumerGroupInfo.updateSubscription(subList);
// 如果有变更
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
// 通知变更,consumerGroupInfo中存储了该消费者组下的所有消费者的channel
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
// 注册Consumer
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
}
在 updateChannel 方法中,首先将变更状态 updated 初始化为false,然后根据消费者的channel从 channelInfoTable 路由表中获取对应的 ClientChannelInfo 对象:
如果ClientChannelInfo对象获取为空,表示之前不存在该消费者的channel信息,将其加入到路由表中,变更状态置为true,表示消费者有变化; 。
如果获取不为空,判断clientid是否一致,如果不一致更新为最新的channel信息,但是变更状态 updated 不发生变化; 。
也就是说, 如果注册的消费者之前不存在,那么将变更状态置为true,表示消费者数量发生了变化.
// key为消费者对应的channle,value为chanel信息
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
boolean updated = false; // 变更状态初始化为false
this.consumeType = consumeType;
this.messageModel = messageModel;
this.consumeFromWhere = consumeFromWhere;
// 从channelInfoTable中获取对应的Channel信息,
ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
if (null == infoOld) { // 如果为空
// 新增
ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
if (null == prev) { // 如果之前不存在
log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
messageModel, infoNew.toString());
// 变更状态置为true
updated = true;
}
infoOld = infoNew;
} else {
// 如果之前存在,判断clientid是否一致,如果不一致更新为最新的channel
if (!infoOld.getClientId().equals(infoNew.getClientId())) {
log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ", this.groupName, infoOld.toString(), infoNew.toString());
this.channelInfoTable.put(infoNew.getChannel(), infoNew);
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
return updated;
}
updateSubscription 方法中,主要判断了消费的主题订阅信息是否发生了变化,subscriptionTable中记录了之前记录的订阅信息:
如果消费者订阅的主题发生了变化,比如有新增加的主题或者删除了某个主题的订阅,会被判断为主题订阅信息发生了变化.
public class ConsumerGroupInfo {
// 记录了订阅的主题信息,key为topic,value为订阅信息
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>();
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
// 遍历订阅的主题信息
for (SubscriptionData sub : subList) {
//根据主题获取订阅信息
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
// 如果获取为空
if (old == null) {
// 加入到subscriptionTable
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
if (null == prev) {
updated = true; // 变更状态置为true
log.info("subscription changed, add new topic, group: {} {}", this.groupName, sub.toString());
}
} else if (sub.getSubVersion() > old.getSubVersion()) { // 如果版本发生了变化
if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
log.info("subscription changed, group: {} OLD: {} NEW: {}", this.groupName, old.toString(), sub.toString());
}
// 更新为最新的订阅信息
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
// 进行遍历,这一步主要是判断有没有取消订阅的主题
while (it.hasNext()) {
Entry<String, SubscriptionData> next = it.next();
String oldTopic = next.getKey();
boolean exist = false;
// 遍历最新的订阅信息
for (SubscriptionData sub : subList) {
// 如果在旧的订阅信息中存在就终止,继续判断下一个主题
if (sub.getTopic().equals(oldTopic)) {
exist = true;
break;
}
}
// 走到这里,表示有取消订阅的主题
if (!exist) {
log.warn("subscription changed, group: {} remove topic {} {}",this.groupName, oldTopic, next.getValue().toString());
// 进行删除
it.remove();
// 变更状态置为true
updated = true;
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
return updated;
}
}
上面讲解了两种被判定为消费者发生变化的情况,被判定为变化之后,会触调用 DefaultConsumerIdsChangeListener 中的 handle 方法触发变更事件,在方法中传入了消费者组下的所有消费者的channel对象,会发送变更请求通知该消费者组下的所有消费者,进行负载均衡.
DefaultConsumerIdsChangeListener 中处理变更事件时,会对消费组下的所有消费者遍历,调用 notifyConsumerIdsChanged 方法向每一个消费者发送变更请求
public class ConsumerManager {
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// ...
// 更新Channel
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// 更新订阅信息
boolean r2 = consumerGroupInfo.updateSubscription(subList);
// 如果有变更
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
// 触发变更事件,consumerGroupInfo中存储了该消费者组下的所有消费者的channel
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
// ...
}
}
// DefaultConsumerIdsChangeListener
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
if (event == null) {
return;
}
switch (event) {
case CHANGE:// 如果是消费者变更事件
case CHANGE:
if (args == null || args.length < 1) {
return;
}
// 获取所有的消费者对应的channel
List<Channel> channels = (List<Channel>) args[0];
if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
for (Channel chl : channels) {
// 向每一个消费者发送变更请求
this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
}
}
break;
// ...
}
}
}
请求的发送是在 Broker2Client 的 notifyConsumerIdsChanged 方法中实现的,可以看到会创建 NOTIFY_CONSUMER_IDS_CHANGED 请求并发送:
public class Broker2Client {
public void notifyConsumerIdsChanged(
final Channel channel,
final String consumerGroup) {
if (null == consumerGroup) {
log.error("notifyConsumerIdsChanged consumerGroup is null");
return;
}
NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
// 创建变更通知请求
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
try {
// 发送请求
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error("notifyConsumerIdsChanged exception. group={}, error={}", consumerGroup, e.toString());
}
}
}
消费者对Broker发送的 NOTIFY_CONSUMER_IDS_CHANGED 的请求处理在 ClientRemotingProcessor 的 processRequest 方法中,它会调用 notifyConsumerIdsChanged 方法进行处理,在 notifyConsumerIdsChanged 方法中可以看到触发了一次负载均衡:
public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED: // NOTIFY_CONSUMER_IDS_CHANGED请求处理
// 处理变更请求
return this.notifyConsumerIdsChanged(ctx, request);
// ...
default:
break;
}
return null;
}
public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
try {
// ...
// 触发负载均衡
this.mqClientFactory.rebalanceImmediately();
} catch (Exception e) {
log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));
}
return null;
}
}
消费者在停止时,需要将当前消费者负责的消息队列分配给其他消费者进行消费,所以在 shutdown 方法中会调用 MQClientInstance 的 unregisterConsumer 方法取消注册:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public synchronized void shutdown(long awaitTerminateMillis) {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.consumeMessageService.shutdown(awaitTerminateMillis);
this.persistConsumerOffset();
// 取消注册
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
// ...
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
}
在 unregisterConsumer 方法中,又调用了 unregisterClient 方法取消注册,与注册消费者的逻辑相似,它会向所有的Broker发送取消注册的请求:
public class MQClientInstance {
public synchronized void unregisterConsumer(final String group) {
this.consumerTable.remove(group);
// 取消注册
this.unregisterClient(null, group);
}
private void unregisterClient(final String producerGroup, final String consumerGroup) {
// 获取所有的Broker
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
// 进行遍历
while (it.hasNext()) {
// ...
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
String addr = entry1.getValue();
if (addr != null) {
try {
// 发送取消注册请求
this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, clientConfig.getMqClientApiTimeout());
// ...
} // ...
}
}
}
}
}
}
取消注册请求的发送是在 MQClientAPIImpl 中 unregisterClient 方法实现的,可以看到构建了 UNREGISTER_CLIENT 请求并发送:
public class MQClientAPIImpl {
public void unregisterClient(final String addr, final String clientID, final String producerGroup, final String consumerGroup, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
// ...
requestHeader.setConsumerGroup(consumerGroup);
// 构建UNREGISTER_CLIENT请求
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
// 发送请求
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
// ...
}
}
与注册消费者的请求处理一样,Broker对 UNREGISTER_CLIENT 的请求同样是在 ClientManageProcessor 的 processRequest 中处理的,对于 UNREGISTER_CLIENT 请求是调用unregisterClient方法处理的,里面又调用了 ConsumerManager 的 unregisterConsumer 方法进行取消注册:
public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.HEART_BEAT: // 处理心跳请求
return this.heartBeat(ctx, request);
case RequestCode.UNREGISTER_CLIENT: // 取消注册请求
return this.unregisterClient(ctx, request);
case RequestCode.CHECK_CLIENT_CONFIG:
return this.checkClientConfig(ctx, request);
default:
break;
}
return null;
}
public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
// ...
{
final String group = requestHeader.getConsumerGroup();
if (group != null) {
// ...
// 取消消费者的注册
this.brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable);
}
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
}
在 ConsumerManager 的 unregisterConsumer 方法中,可以看到触发了取消注册事件,之后如果开启了允许通知变更,会触发变更事件,变更事件在上面已经讲解过,它会通知消费者组下的所有消费者进行一次负载均衡:
public class ConsumerManager {
public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
boolean isNotifyConsumerIdsChangedEnable) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null != consumerGroupInfo) {
consumerGroupInfo.unregisterChannel(clientChannelInfo);
if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(group);
if (remove != null) {
// 触发取消注册事件
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
}
}
// 触发消费者变更事件
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
}
}
在 RebalanceService 的run方法中,可以看到设置了等待时间,默认是20s,所以消费者本身也会定时执行负载均衡:
public class RebalanceService extends ServiceThread {
private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval); // 等待
// 负载均衡
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
总结 。
参考 。
田守枝-深入理解RocketMQ Rebalance机制 。
RocketMQ版本:4.9.3 。
最后此篇关于【RocketMQ】负载均衡源码分析的文章就讲到这里了,如果你想了解更多关于【RocketMQ】负载均衡源码分析的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
ACO.Visualization项目 本项目演示蚁群算法求解旅行商问题的可视化过程,包括路径上的信息素浓度、蚁群的运动过程等。项目相关的代码:https://github.com/anycad/A
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以便用事实和引用来回答。 关闭 7 年前。
我需要用Sql数据库制作并包含的PHP票务系统源码用户客户端和管理员。我需要个人 CMS 的这个来源。谢谢你帮助我。 最佳答案 我在不同的情况下使用了 osticket。 这里: http://ost
我的场景:我想在日志文件中写入发生异常的部分代码(例如,发生异常的行前 5 行和行后 5 行 - 或者至少是该方法的所有代码)。 我的想法是用 C# 代码反编译 pdb 文件,并从该反编译文件中找到一
RocketMQ设定了延迟级别可以让消息延迟消费,延迟消息会使用 SCHEDULE_TOPIC_XXXX 这个主题,每个延迟等级对应一个消息队列,并且与普通消息一样,会保存每个消息队列的消费进度
先附上Hystrix源码图 在微服务架构中,根据业务来拆分成一个个的服务,服务与服务之间可以相互调用(RPC),在Spring Cloud可以用RestTemplate+Ribbon和
此篇博客学习的api如标题,分别是: current_url 获取当前页面的url; page_source 获取当前页面的源码; title 获取当前页面的titl
? 1 2
1、前言 作为一个数据库爱好者,自己动手写过简单的sql解析器以及存储引擎,但感觉还是不够过瘾。<<事务处理-概念与技术>>诚然讲的非常透彻,但只能提纲挈领,不能让你
gory"> 目录 运行时信号量机制 semaphore 前言 作用是什么 几个主要的方法 如何实现
自己写的一个评论系统源码分享给大家,包括有表情,还有评论机制。用户名是随机的 针对某一篇文章进行评论 function subcomment() {
一、概述 StringBuilder是一个可变的字符串序列,这个类被设计去兼容StringBuffer类的API,但不保证线程安全性,是StringBuffer单线程情况下的一个替代实现。在可能的情
一、概述 System是用的非常多的一个final类。它不能被实例化。System类提供了标准的输入输出和错误输出流;访问外部定义的属性和环境变量;加载文件和库的方法;以及高效的拷贝数组中一部分元素
在JDK中,String的使用频率和被研究的程度都非常高,所以接下来我只说一些比较重要的内容。 一、String类的概述 String类的声明如下: public final class Str
一、概述 Class的实例代表着正在运行的Java应用程序的类和接口。枚举是一种类,而直接是一种接口。每一个数组也属于一个类,这个类b被反射为具有相同元素类型和维数的所有数组共享的类对象。八大基本树
一、概述 Compiler这个类被用于支持Java到本地代码编译器和相关服务。在设计上,这个类啥也不做,他充当JIT编译器实现的占位符。 放JVM虚拟机首次启动时,他确定系统属性java.comp
一、概述 StringBuffer是一个线程安全的、可变的字符序列,跟String类似,但它能被修改。StringBuffer在多线程环境下可以很安全地被使用,因为它的方法都是通过synchroni
一、概述 Enum是所有Jav中枚举类的基类。详细的介绍在Java语言规范中有说明。 值得注意的是,java.util.EnumSet和java.util.EnumMap是Enum的两个高效实现,
一、概述 此线程指的是执行程序中的线程。 Java虚拟机允许应用程序同时执行多个执行线程。 每个线程都有优先权。 具有较高优先级的线程优先于优先级较低的线程执行。 每个线程可能也可能不会被标记为守
一、抽象类Number 类继承关系 这里面的原子类、BigDecimal后面都会详细介绍。 属性和抽象方法 二、概述 所有的属性,最小-128,最大127,SIZE和BYTES代码比
我是一名优秀的程序员,十分优秀!