- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
RocketMQ设定了延迟级别可以让消息延迟消费,延迟消息会使用 SCHEDULE_TOPIC_XXXX 这个主题,每个延迟等级对应一个消息队列,并且与普通消息一样,会保存每个消息队列的消费进度(delayOffset.json中的offsetTable):
public class MessageStoreConfig {
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}
延迟级别与延迟时间对应关系: 延迟级别0 ---> 对应延迟时间1s,也就是延迟1秒后消费者重新从Broker拉取进行消费 延迟级别1 ---> 延迟时间5s 延迟级别2 ---> 延迟时间10s ... 以此类推,最大的延迟时间为2h.
使用延迟消息时,只需设定延迟级别即可,Broker在存储的时候会判断是否设定了延迟级别,如果设置了延迟级别就按延迟消息来处理,由 【消息的存储】 文章可知,消息存储之前会进入到 asyncPutMessage 方法中,延迟消息的处理就是在这里做的,处理逻辑如下:
判断消息的延迟级别是否超过了最大延迟级别,如果超过了就使用最大延迟级别; 。
获取 RMQ_SYS_SCHEDULE_TOPIC ,它是在 TopicValidator 中定义的常量,值为 SCHEDULE_TOPIC_XXXX
public class TopicValidator {
// ...
public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
}
根据延迟级别选取对应的队列,一般会把相同延迟级别的消息放在同一个队列中; 。
将消息原本的TOPIC和队列ID设置到消息属性中; 。
更改消息队列的主题为 RMQ_SYS_SCHEDULE_TOPIC ,所以延迟消息的主题最终被设置为 RMQ_SYS_SCHEDULE_TOPIC ,会将消息投递到延迟队列中; 。
public class CommitLog {
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// ...
// 获取事务类型
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 如果未使用事务或者提交事务
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 判断延迟级别
if (msg.getDelayTimeLevel() > 0) {
// 如果超过了最大延迟级别
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 获取RMQ_SYS_SCHEDULE_TOPIC
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 根据延迟级别选取对应的队列
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 将消息原本的TOPIC和队列ID设置到消息属性中
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 设置SCHEDULE_TOPIC
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
// ...
}
}
延迟消息被投递到延迟队列中之后,会由定时任务去处理队列中的消息,接下来就去看下定时任务的处理过程.
Broker启动的时候会调用 ScheduleMessageService 的 start 方法,start方法中为不同的延迟级别创建了对应的定时任务来处理延迟消息,然后从offsetTable中获取当前延迟等级对应那个消息队列的消费进度,如果未获取到,则使用0,从队列的第一条消息开始处理,然后创建定时任务 DeliverDelayedMessageTimerTask ,可以看到首次是延迟1000ms执行:
public class ScheduleMessageService extends ConfigManager {
// 首次执行延迟的时间
private static final long FIRST_DELAY_TIME = 1000L;
public void start() {
if (started.compareAndSet(false, true)) {
super.load();
this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
if (this.enableAsyncDeliver) {
this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
}
// 遍历所有的延迟级别
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) { // 如果获取的消费进度为空
offset = 0L; // 默认为0,从第一条消息开始处理
}
if (timeDelay != null) {
if (this.enableAsyncDeliver) {
this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
// 为每个延迟级别创建对应的定时任务
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
// ...
}
}
}
DeliverDelayedMessageTimerTask 是 ScheduleMessageService 的内部类,它实现了 Runnable 接口,在run方法中调用了 executeOnTimeup 方法来处理延迟消息:
public class ScheduleMessageService extends ConfigManager {
class DeliverDelayedMessageTimerTask implements Runnable {
@Override
public void run() {
try {
if (isStarted()) {
// 执行任务
this.executeOnTimeup();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);
}
}
}
}
executeOnTimeup 方法的处理逻辑如下:
ConsumeQueue
,如果获取为空,会重新创建一个任务提交到线程池中,延迟时间为DELAY_FOR_A_WHILE,延迟一段时间后重新执行;
public class ScheduleMessageService extends ConfigManager {
class DeliverDelayedMessageTimerTask implements Runnable {
public void executeOnTimeup() {
// 根据主题名称以及延迟等级获取ConsumeQueue
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
// 如果ConsumeQueue为空,新建定时任务等待下次执行
if (cq == null) {
this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
return;
}
// 根据偏移量从ConsumeQueue获取数据
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ == null) {
// ...
// 如果获取为空,新建定时任务等待下次执行
this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
return;
}
long nextOffset = this.offset;
try {
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
// 开始处理延迟消息
for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
// 获取消息在CommitLog中的偏移量
long offsetPy = bufferCQ.getByteBuffer().getLong();
// 消息大小
int sizePy = bufferCQ.getByteBuffer().getInt();
// tag哈希值
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
// 获取消息存储时间戳
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
// 根据延迟等级和消息的存储时间计算消息的到期时间
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
// 获取当前时间
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
// 计算消息的到期时间
long countdown = deliverTimestamp - now;
// 如果大于0,表示还未到达指定的延迟时间,需要继续等待
if (countdown > 0) {
// 新建定时任务等待下次执行
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
// 走到这里,表示已经到了消息的延迟时间,从CommitLog取出消息
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
if (msgExt == null) {
continue;
}
// 处理消息,这里会恢复消息原本的Topic
MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
}
boolean deliverSuc;
// 投递消息到原本的主题中
if (ScheduleMessageService.this.enableAsyncDeliver) {
// 异步投递
deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
} else {
// 同步投递
deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
}
if (!deliverSuc) {
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
}
// 计算下一条消息的偏移量
nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
} catch (Exception e) {
log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
} finally {
bufferCQ.release();
}
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}
}
private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setBody(msgExt.getBody()); // 设置消息体
msgInner.setFlag(msgExt.getFlag()); // 设置falg
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
// ...
msgInner.setWaitStoreMsgOK(false);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
// 恢复原本的Topic
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
int queueId = Integer.parseInt(queueIdStr);
msgInner.setQueueId(queueId);
return msgInner;
}
}
最后此篇关于【RocketMQ】【源码】延迟消息实现原理的文章就讲到这里了,如果你想了解更多关于【RocketMQ】【源码】延迟消息实现原理的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
什么是RocketMQ RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。主要功能是异步解耦和流量削峰:。 常见的MQ主要有
1、说说你们公司线上生产环境用的是什么消息中间件? 见【2、多个mq如何选型?】 2、多个mq如何选型? MQ 描述 Rabb
消费者从Broker拉取到消息之后,会将消息提交到线程池中进行消费,RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求 ConsumeReques
全局有序 在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一
RocketMQ支持集群部署来保证高可用。它基于主从模式,将节点分为Master、Slave两个角色,集群中可以有多个Master节点,一个Master节点可以有多个Slave节点。Master节点
RocketMQ 4.5版本之前,可以采用主从架构进行集群部署,但是如果master节点挂掉,不能自动在集群中选举出新的Master节点,需要人工介入,在4.5版本之后提供了DLedger模式,使用
RocketMQ是通过 DefaultMQProducer 进行消息发送的,它实现了 MQProducer 接口, MQProducer 接口中定义了消息发送的方法,方法主要分为三大类:
当Broker收到生产者的消息发送请求时,会对请求进行处理,从请求中解析发送的消息数据,接下来以单个消息的接收为例,看一下消息的接收过程。 数据校验 封装消息 首先Broker会创
在上一讲中,介绍了消息的存储,生产者向Broker发送消息之后,数据会写入到CommitLog中,这一讲,就来看一下消费者是如何从Broker拉取消息的。 RocketMQ消息的消费以组为单位
NameServer是一个注册中心,提供服务注册和服务发现的功能。NameServer可以集群部署,集群中每个节点都是对等的关系(没有像ZooKeeper那样在集群中选举出一个Master节点),节
RocketMQ 4.5版本之前,可以采用主从架构进行集群部署,但是如果master节点挂掉,不能自动在集群中选举出新的Master节点,需要人工介入,在4.5版本之后提供了DLedger模式,使用
消息存储 在 【RocketMQ】消息的存储 一文中提到,Broker收到消息后会调用 CommitLog 的asyncPutMessage方法写入消息,在DLedger模式下使用的是
RocketMQ在集群模式下,同一个消费组内,一个消息队列同一时间只能分配给组内的某一个消费者,也就是一条消息只能被组内的一个消费者进行消费,为了合理的对消息队列进行分配,于是就有了负载均衡.
RocketMQ有两种获取消息的方式,分别为推模式和拉模式。 推模式 推模式在 【RocketMQ】消息的拉取 一文中已经讲过,虽然从名字上看起来是消息到达Broker后推送给消费者
主从同步的实现逻辑主要在 HAService 中,在 DefaultMessageStore 的构造函数中,对 HAService 进行了实例化,并在start方法中,启动了 HAService :
在 【RocketMQ】消息的拉取 一文中可知,消费者在启动的时候,会创建消息拉取API对象 PullAPIWrapper ,调用pullKernelImpl方法向Broker发送拉取消息的
全局有序 在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一
概述 RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息; 预设值的延迟时间间隔为: 1s、 5s、 10s、 30s、 1m
RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等。 其中,level=0 级表示不延时,level=1 表示 1 级延时,le
我尝试给 rockerMQ broker 加注星标,但我收到了错误消息: There is insufficient memory for the Java Runtime Environment t
我是一名优秀的程序员,十分优秀!