- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
这篇文章我们聊聊广播消费,因为广播消费在某些场景下真的有奇效。笔者会从 基础概念 、 实现机制 、 实战案例 、 注意事项 四个方面一一展开,希望能帮助到大家.
RocketMQ 支持两种消息模式: 集群消费 ( Clustering )和 广播消费 ( Broadcasting ).
集群消费 :
同一 Topic 下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上.
广播消费 :
当使用广播消费模式时,每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次.
首先下图展示了广播消费的代码示例.
public class PushConsumer {
public static final String CONSUMER_GROUP = "myconsumerGroup";
public static final String DEFAULT_NAMESRVADDR = "localhost:9876";
public static final String TOPIC = "mytest";
public static final String SUB_EXPRESSION = "TagA || TagC || TagD";
public static void main(String[] args) throws InterruptedException, MQClientException {
// 定义 DefaultPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
// 定义名字服务地址
consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
// 定义消费读取位点
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 定义消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 订阅主题信息
consumer.subscribe(TOPIC, SUB_EXPRESSION);
// 订阅消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
for (MessageExt messageExt : msgs) {
System.out.println(new String(messageExt.getBody()));
}
}catch (Exception e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}
和集群消费不同的点在于下面的代码:
consumer.setMessageModel(MessageModel.BROADCASTING);
接下来,我们从源码角度来看看广播消费和集群消费有哪些差异点 ?
首先进入 DefaultMQPushConsumerImpl 类的 start 方法 , 分析启动流程中他们两者的差异点:
▍ 差异点1:拷贝订阅关系 。
private void copySubscription() throws MQClientException {
try {
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
// 注意下面的代码 , 集群模式下自动订阅重试主题
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
在集群模式下,会自动订阅重试队列,而广播模式下,并没有这段代码。也就是说 广播模式下,不支持消息重试 .
▍ 差异点2:本地进度存储 。
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
我们可以看到消费进度存储的对象是: LocalFileOffsetStore , 进度文件存储在如下的主目录 /{用户主目录}/.rocketmq_offsets .
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir",
System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
进度文件是 /mqClientId/{consumerGroupName}/offsets.json .
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + this.mQClientFactory.getClientId() + File.separator + this.groupName + File.separator + "offsets.json";
笔者创建了一个主题 mytest , 包含4个队列,进度文件内容如下:
消费者启动后,我们可以将整个流程简化如下图,并继续整理差异点:
▍ 差异点3:负载均衡消费该主题的所有 MessageQueue 。
进入负载均衡抽象类 RebalanceImpl 的 rebalanceByTopic 方法 .
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
// 省略代码
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
// 省略代码
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
// 省略日志打印代码
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
//省略代码
}
break;
}
default:
break;
}
}
从上面代码我们可以看到消息模式为广播消费模式时,消费者会订阅该主题下所有的 messageQueue ,这一点也可以从本地的进度文件 offsets.json 得到印证.
▍ 差异点4:不支持顺序消息 。
顺序消费会向 Borker 申请锁 。消费者根据分配的队列 messageQueue ,向 Borker 申请锁 ,如果申请成功,则会拉取消息,如果失败,则定时任务每隔20秒会重新尝试.
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
} catch (Throwable e) {
log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
}
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
从上面的代码,我们发现只有在集群消费的时候才会定时申请锁,这样就会导致广播消费时,无法为负载均衡的队列申请锁,导致拉取消息服务一直无法获取消息数据.
为了再次验证,我们修改例子,消费模式从并发消费修改为顺序消费 .
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
try {
for (MessageExt messageExt : msgs) {
System.out.println(new String(messageExt.getBody()));
}
}catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
});
从图中,笔者观察到拉取消息的线程无法发起拉取消息请求到 Broker ,因为负载均衡后的队列无法获取到锁.
因此, 广播消费模式并不支持顺序消息 .
▍ 差异点5:并发消费消费失败时,没有重试 。
进入并发消息消费类 ConsumeMessageConcurrentlyService 的处理消费结果方法 processConsumeResult .
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
消费消息失败后,集群消费时,消费者实例会通过 CONSUMER_SEND_MSG_BACK 请求,将失败消息发回到 Broker 端.
但在广播模式下,仅仅是打印了消息信息。因此, 广播模式下,并没有消息重试 .
笔者第一次接触广播消费的业务场景是神州专车司机端消息推送。 用户下单之后,订单系统生成专车订单,派单系统会根据相关算法将订单派给某司机,司机端就会收到派单推送.
推送架构图如下:
司机端启动后,会通过负载均衡和推送服务创建长连接,推送服务会保存 TCP 连接引用 (比如司机编号和 TCP channel 的引用).
推送服务是一个 TCP 服务(自定义协议),同时也是一个消费者服务,消息模式是广播消费.
派单服务是生产者,将派单数据发送到 MetaQ , 每个推送服务都会消费到该消息,推送服务判断本地内存中是否存在该司机的 TCP channel , 若存在,则通过 TCP 连接将数据推送给司机端.
肯定有同学会问:假如网络原因,推送失败怎么处理 ?有两个要点:
- 司机端定时主动拉取派单信息;
- 当推送服务没有收到司机端的 ACK 时 ,也会一定时限内再次推送,达到阈值后,不再推送。
集群消费和广播消费模式下,各功能的支持情况如下:
功能 | 集群消费 | 广播消费 |
---|---|---|
顺序消息 | 支持 | 不支持 |
重置消费位点 | 支持 | 不支持 |
消息重试 | 支持 | 不支持 |
消费进度 | 服务端维护 | 客户端维护 |
参考资料 :
https://www.51cto.com/article/714277.html 。
https://ost.51cto.com/posts/21100 。
如果我的文章对你有所帮助,还请帮忙 点赞、在看、转发 一下,你的支持会激励我输出更高质量的文章,非常感谢! 。
最后此篇关于深入理解RocketMQ广播消费的文章就讲到这里了,如果你想了解更多关于深入理解RocketMQ广播消费的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
最近做一个项目,由于是在别人框架里开发app,导致了很多限制,其中一个就是不能直接引用webservice 。 我们都知道,调用webserivice 最简单的方法就是在 "引用"
这是SDL2代码的一部分 SDL主函数 int main(int argc,char *argv[]) { ... ... bool quit=false; S
c 中的函数: PHPAPI char *php_pcre_replace(char *regex, int regex_len, ch
我有以下映射: public class SecurityMap : ClassMap { public SecurityMap() {
我在vue-lic3中使用了SCSS,但是有一个奇怪的错误,使用/ deep /会报告错误,我不想看到它。 代码运行环境 vue-cli3 + vant + scss 的CSS /deep/ .van
我在深入阅读 C# 时遇到了这个我能理解的内容: 当它被限制为引用类型时,执行的比较类型完全取决于类型参数被限制为什么。 但是不能理解这个: 如果进一步限制派生自重载 == 和 != 运算符的特定类型
Closed. This question is opinion-based。它当前不接受答案。 想改善这个问题吗?更新问题,以便editing this post用事实和引用来回答。 3年前关闭。
有人可以详细介绍关于自赋值的运算符重载中的 *this 和 const 例如: Class& Class::operator=(const Class& other) { a = other.
在向树中插入新节点时,如何填充闭包表的深度/长度列? ancestor 和 descendant 中的值是来自另一个表的 ID,表示要以树结构排列的页面。 关闭表: ancestor desce
现在我正在阅读“深入了解 C#”。缺少的一件事是完成一章后我可以解决的一系列问题。那会帮助我理解我刚刚学到的概念。 哪里可以找到适合 C#3.0 的问题集? 谢谢 最佳答案 你可以试试LINQ 101
TypeScript 给 JavaScript 扩展了类型的语法,我们可以给变量加上类型,在编译期间会做类型检查,配合编辑器还能做更准确的智能提示。此外,TypeScript 还支持了高级类型用
是否有一个单行代码来获取生成器并生成该生成器中的所有元素?例如: def Yearly(year): yield YEARLY_HEADER for month in range(1, 13)
所以我阅读了一些与“什么是方法组”相关的 StackOverflow 问题以及其他互联网文章,它们在底线都说了同样的话——方法组是“一组重载方法” ". 但是,在阅读 Jon Skeet 的“C# 深
有什么方法可以从子组件中获取子组件吗? 想象一下以下组件树: 应用程序 问题 问题选项(包含复选框) 问题选项(包含复选框) 问题选项(包含复选框) 我想从 App 访问问题选项以选中所有复选框。 参
class_eval 和 instance_eval 在定义方法等情况下是完全可以预测的。我也理解类的实例和类的单例(又名特征类)之间的区别。 但是 我无法弄清楚以下唯一的事情:比方说,出于某些策略目
我想出了如何将符号 rwx 部分读取/转换为 421 个八进制部分,这非常简单。但是当涉及到特殊字符时,我感到非常困惑。我们知道 -r-xr---wx 转换为 0543,但 -r-sr---wt 或
我怀疑我系统的 Java 版本有问题。某些应用程序出现段错误或内存不足或存在链接错误。如果我从源代码安装了 JDK,我会做类似“make test”的事情,看看哪些测试失败了。但是,看起来从源代码构建
如何克隆一个 repo(使用 libgit2 ) 我想做什么git clone确实,但有 libgit2 .我可能要问的是什么 git clone确实很深入。 这是我目前正在做的: 初始化一个repo
00、头痛的JS闭包、词法作用域? 被JavaScript的闭包、上下文、嵌套函数、this搞得很头痛,这语言设计的,感觉比较混乱,先勉强理解总结一下😂😂😂.
我开始玩 lubridate R 中的包。我注意到 now(tzone="EST")计算为: [1] "2015-08-25 13:01:08 EST" 而 now(tzone="PST")导致警告:
我是一名优秀的程序员,十分优秀!