- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
全局有序 在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一般不使用.
局部有序 假设一个Topic分配了两个消息队列,生产者在发送消息的时候,可以对消息设置一个路由ID,比如想保证一个订单的相关消息有序,那么就使用订单ID当做路由ID,在发送消息的时候,通过订单ID对消息队列的个数取余,根据取余结果选择消息队列,这样同一个订单的数据就可以保证发送到一个消息队列中,消费者端使用 MessageListenerOrderly 处理有序消息,这就是RocketMQ的局部有序,保证消息在某个消息队列中有序.
接下来看RoceketMQ源码中提供的顺序消息例子(稍微做了一些修改):
生产者 。
public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
// 创建生产者
DefaultMQProducer producer = new DefaultMQProducer("生产者组");
// 启动
producer.start();
// 创建TAG
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
// 生成订单ID
int orderId = i % 10;
// 创建消息
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 获取订单ID
Integer id = (Integer) arg;
// 对消息队列个数取余
int index = id % mqs.size();
// 根据取余结果选择消息要发送给哪个消息队列
return mqs.get(index);
}
}, orderId); // 这里传入了订单ID
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
}
消费者 。
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("消费者组");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅主题
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
// 注册消息监听器,使用的是MessageListenerOrderly
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
// 打印消息
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
从例子中可以看出生产者在发送消息的时候,通过订单ID作为路由信息,将同一个订单ID的消息发送到了同一个消息队列中,保证同一个订单ID相关消息有序发送,接下来就看消费者是如何保证消息的顺序消费的.
消费者在启动的时候,会对是否是顺序消费进行判断(监听器是否是 MessageListenerOrderly 类型来判断),如果是顺序消费,会使用 ConsumeMessageOrderlyService ,并调用它的start方法进行启动,在集群模式模式下,start方法中会启动一个定时加锁的任务,周期性的对该消费者负责的消息队列进行加锁.
为什么集群模式下需要加锁? 因为广播模式下,消息队列会分配给消费者下的每一个消费者,而在集群模式下,一个消息队列同一时刻只能被同一个消费组下的某一个消费者进行,所以在广播模式下不存在竞争关系,也就不需要对消息队列进行加锁,而在集群模式下,有可能因为负载均衡等原因将某一个消息队列分配到了另外一个消费者中,因此在顺序消费情况下,集群模式下需要对消息队列加锁,当某个消息队列被锁定时,其他的消费者不能进行消费.
加锁的具体逻辑如下,首先获取当前消费者负责的所有消息队列 MessageQueue ,返回数据是一个MAP,key为broker名称,value为broker下的消息队列,接着对MAP进行遍历,处理每一个broker下的消息队列: (1)根据broker名称查找broker的详细信息; (2)创建加锁请求,在请求中设置要加锁的消息队列,将请求发送给broker,表示要对这些消息队列进行加锁; (3)Broker返回请求处理结果,响应结果中包含了加锁成功的消息队列,对于加锁成功的消息队列将消息队列 MessageQueue ,将其对应的 ProcessQueue 中的locked属性置为true表示该消息队列已加锁成功,如果响应中未包含某个消息队列的信息,表示此消息队列加锁失败,需要将其对应的 ProcessQueue 对象中的locked属性置为false表示加锁失败; 。
上面可知,在使用顺序消息时,定时任务会周期性的对当前消费者负责的消息队列进行加锁,不过由于负载均衡等原因,有可能给当前消费者分配了新的消息队列,此时还未来得及通过定时任务加锁,所以消费者在构建消息拉取请求前会再次进行判断,如果是新分配到当前消费者的消息队列,同样会向Broker发送请求,对 MessageQueue 进行加锁,加锁成功将其对应的 ProcessQueue 中的locked属性置为true才可以拉取消息.
消息拉取成功之后,会将消息提交到线程池中进行处理,对于顺序消费处理逻辑如下:
获取消息队列 MessageQueue 的对象锁,每个 MessageQueue 对应了一把 Object 对象锁,然后使用synchronized进行加锁, 这里加锁的原因是因为顺序消费使用的是线程池,由多个线程同时进行消费,所以某个线程在处理某个消息队列的消息时需要对该消息队列 MessageQueue 加锁,防止其他线程并发消费该消息队列的锁,破坏消息的顺序性 ; 。
public class MessageQueueLock {
private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();
public Object fetchLockObject(final MessageQueue mq) {
// 获取消息队列对应的对象锁,也就是一个Object类型的对象
Object objLock = this.mqLockTable.get(mq);
// 如果获取为空
if (null == objLock) {
// 创建对象
objLock = new Object();
// 加入到Map中
Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
if (prevLock != null) {
objLock = prevLock;
}
}
return objLock;
}
}
上一步获取锁成功之后,会再次校验该 MessageQueue 对应的 ProcessQueue 中的锁(locked状态),看是否过期或者已经失效,过期或者失效稍后会重新进行加锁; 。
获取 ProcessQueue 的中的 consumeLock 消费锁,获取成功之后调用消息监听器的 consumeMessage 方法开始消费消费; 。
public class ProcessQueue {
// 消息消费锁
private final Lock consumeLock = new ReentrantLock();
public Lock getConsumeLock() { // 获取消息消费锁
return consumeLock;
}
}
消息消费完毕,释放 ProcessQueue 的 consumeLock 消费锁; 。
方法执行完毕,释放 MessageQueue 对应的 Object 对象锁; 。
在第1步中就已经获取了 MessageQueue 对应的 Object 对象锁对消息队列进行加锁了,那么为什么在第3步消费消息之前还要再加一个消费锁呢?
猜测有可能是在消费者进行负载均衡时,当前消费者负责的消息队列发生变化,可能移除某个消息队列,那么消费者在进行消费的时候就要获取 ProcessQueue 的 consumeLock 消费锁进行加锁,相当于锁住 ProcessQueue ,防止正在消费的过程中, ProcessQueue 被负载均衡移除.
既然如此,负载均衡的时候为什么不使用 MessageQueue 对应的 Object 对象锁进行加锁而要使用 ProcessQueue 中的 consumeLock 消费锁?
这里应该是为了减小锁的粒度,因为消费者在 MessageQueue 对应的 Object 加锁后,还进行了一系列的判断,校验都成功之后获取 ProcessQueue 中的 consumeLock 加锁,之后开始消费消息,消费完毕释放所有的锁,如果负载均衡使用 MessageQueue 的 Object 对象锁需要等待整个过程结束,锁的粒度较粗,这样显然会降低性能,而如果使用消息消费锁,只需要等待第3步和第4步结束就可以获取锁,减少等待的时间,而且消费者在进行消息消费前也会判断ProcessQueue是否被移除,所以只要保证 consumeMessage 方法在执行的过程中(消息被消费的过程) ProcessQueue 不被移除即可.
总结 。
消费者端,是通过加锁来保证消息的顺序消费,一共有三把锁:
向Broker申请的消息队列锁 集群模式下一个消息队列同一时刻只能被同一个消费组下的某一个消费者进行,为了避免负载均衡等原因引起的变动,消费者会向Broker发送请求对消息队列进行加锁,如果加锁成功,记录到消息队列对应的 ProcessQueue 中的 locked 变量中.
消息队列锁 对应 MessageQueue 对应的 Object 对象锁,消费者在处理拉取到的消息时,由于可以开启多线程进行处理,所以处理消息前需要对 MessageQueue 加锁,锁住要处理的消息队列,主要是处理多线程之间的竞争,保证消息的顺序性.
消息消费锁 对应 ProcessQueue 中的 consumeLock ,消费者在调用consumeMessage方法之前会加消费锁, 主要是为了避免在消费消息时,由于负载均衡等原因,ProcessQueue被删除 .
对应的相关源码可参考:
【RocketMQ】【源码】顺序消息实现原理 。
最后此篇关于【RocketMQ】顺序消息实现总结的文章就讲到这里了,如果你想了解更多关于【RocketMQ】顺序消息实现总结的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
1、定义 设 \(u\) 和 \(v\) 为一张图上的任意两个节点。令 \(c(u, v)\) 为它们之间的边的容量, \(f(u, v)\) 为它们之间的流量,则需要满足以
1、前言 工作中涉及到文件系统,有时候需要判断文件和目录是否存在。我结合apue第四章文件和目录,总结一下如何正确判断文件和目录是否存在,方便以后查询。 2、stat系列函数 stat函数用来
并查集(Union-Find Set): 一种用于管理分组的数据结构。它具备两个操作:(1)查询元素a和元素b是否为同一组 (2) 将元素a和b合并为同一组。 注意:并查集不能将在同一组的元素拆
当下,注解非常流行,以前很长篇的代码,现在基本上一个注解就能搞定。 那,在Mybatis中又有哪些注解呢? Mybatis中的注解基本上都在org.apache.ibatis.annotat
指针操作数组,方法一是p+index,方法二是p[index],第二种方法跟数组访问方法是一样的。 数组引用返回的是数组的第一个元素的指针地址。 可以将指针指向数组的任意元素,然后从那里开始访问
通常部署完php环境后会进行一些安全设置,除了熟悉各种php漏洞外,还可以通过配置php.ini来加固PHP的运行环境,PHP官方也曾经多次修改php.ini的默认设置。 下面对php.ini中一
在JavaScript中,使用typeof可以检测基本数据类型,使用instanceof可以检测引用数据类型。在PHP中,也有检测数据类型的方法,具体如下: 1、输出变量的数据类型(gettype
把图片缓存到本地,在很多场景都会用到,如果只是存储文件信息,那建一个plist文件,或者数据库就能很方便的解决问题,但是如果存储图片到沙盒就没那么方便了。这里简单介绍两种保存图片到沙盒的方法。
(1)需要安装docker容器,在docker容器内安装jenkins,gogs,tomcat。 新建maven项目,添加findbugs plugin。 使用docker
今天主题是实现并发服务器,实现方法有多种版本,先从简单的单进程代码实现到多进程,多线程的实现,最终引入一些高级模块来实现并发TCP服务器。 说到TCP,想起吐槽大会有个段子提到三次握手,也只有程序
如下所示: Ctrl+1或F2快速修复 Ctrl+D快捷删除行 Shift+Enter 快速切换到下一行,在本行的任何位置都可 Ctrl+F11快速运行代码 Alt+上下键 快速移动行(可
JSP是Servlet技术的扩展,本质上是Servlet的简易方式,更强调应用的外表表达。 JSP编译后是”类servlet”。 Servlet和JSP最主要的不同点在于,Servlet的应用逻辑
Java中的Runable,Callable,Future,FutureTask,ExecutorService,Excetor,Excutors,ThreadPoolExcetor在这里对这些关键
读取Java文件到byte数组的三种方法(总结) ? 1
用java实现的数组创建二叉树以及递归先序遍历,递归中序遍历,递归后序遍历,非递归前序遍历,非递归中序遍历,非递归后序遍历,深度优先遍历,广度优先遍历8种遍历方式:
1、简明总结 ASCII(char) 返回字符的ASCII码值 BIT_LENGTH(str) 返回字符串的比特长度 CONCAT(s1,s2…,sn)
java应用服务器(web server),是指运行java程序的web应用服务器软件,不包括nginx、Apache等通用web服务器软件。 一、Tomcat Tomcat是Apache 软件基
事务作为抽象层,允许应用忽略DB 内部一些复杂并发问题和某些硬件、软件故障,简化应用层的处理逻辑:事务中止(transaction abort),而应用仅需重试。对复杂访问模式,事务可大大减少需要考虑
我们在本教程学习了如何描述 XML 文档的结构 我们学习到了如何使用 DTD 来定义一个 XML 文档的合法元素,以及如何在我们的 XML 内部或者作为一个外部引用来声明 DTD 我们学习了如何为
在这个XPath 基础教程中我们讲解了如何在 XML 文档中查找信息 我们可以使用 XPath 的元素和属性在 XML 文档中进行导航 我们也学习了如何使用 XPath 中内建的某些标准函数 如
我是一名优秀的程序员,十分优秀!