- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
当Broker收到生产者的消息发送请求时,会对请求进行处理,从请求中解析发送的消息数据,接下来以单个消息的接收为例,看一下消息的接收过程.
首先Broker会创建一个 MessageExtBrokerInner 对象封装从请求中解析到的消息数据,它会将Topic信息、队列ID、消息内容、消息属性、发送消息时间、发送消息的主机地址等信息设置到 MessageExtBrokerInner 中,后续都使用这个 MessageExtBrokerInner 对象来操纵消息.
接下来会判断是否开启事务,开启事务与未开启事务时调用的方法不一致,这里以未开启事务为例,看下消息的持久化过程.
在存储消息之前,需要对消息进行一系列的校验,保证收到的消息有效合法.
主要对Broker是否可以写入消息进行检查,包含以下几个方面:
主要是对主题的长度校验和消息属性的长度校验.
主要判断在开启LMQ(Light Message Queue)时是否超过了最大消费数量.
对消息进行校验完毕之后,就可以对消息进行写入了.
前面说到Broker将收到的消息封装为了 MessageExtBrokerInner 对象,这里会新增以下设置:
(1)设置消息存储的时间(当前时间); (2)计算消息体的CRC值,并设置到对应的成员变量中; (3)如果发送消息的主机地址或者当前存储消息的Broker地址使用了IPV6,设置相应的IPV6标识; 。
RocketMQ会将消息数据先写入内存buffer,写入之前还会做一些校验: (1)对消息属性数据的长度进行校验判断是否超过限定值; (2)对消息整体内容长度进行校验,判断是否超过最大的长度限制; 。
校验通过之后,会根据消息总体内容的长度对buffer进行初始化,也就是根据需要的大小申请一块内存区域,开始写入以下数据:
整体存储格式如下:
RocketMQ将每一条消息存储到CommitLog文件中,存储文件的根目录由配置参数 storePathRootDir 决定:
默认每一个CommitLog的文件大小为1G,如果文件写满会新建一个CommitLog文件,以该文件中第一条消息的偏移量为文件名,小于20位用0补齐.
比如第一个文件中第一条消息的偏移量为0,那么第一个文件的名称为00000000000000000000,当这个文件存满之后,需要重新建立一个CommitLog文件,一个文件大小为1G, 1GB = 1024*1024*1024 = 1073741824 Bytes ,所以下一个文件就会被命名为00000000001073741824.
在持久化消息之前,需要知道消息要写入哪个CommitLog文件,RocketMQ通过一个队列(对应 MappedFileQueue )存储了记录了所有的CommitLog文件(对应 MappedFile ),并提供了相关方法获取到当前正在使用的那个CommitLog.
mappedFileQueue是所有mappedFile的集合,可以理解为CommitLog文件所在的那个目录。 MappedFile可以看做是每一个Commitlog文件的映射对象,每一个CommitLog对于一个MappedFile对象.
如果获取到的CommitLog取为空或者已写满,可能是首次写入消息还未创建文件或者上一次写入的文件已达到规定的大小(1G),此时会新建一个CommitLog文件.
需要注意,在获取CommitLog之前会加锁,一是防止在多线程情况下创建多个CommitLog,二是接下来要往CommitLog中写入消息内容,防止多线程情况下数据错乱.
知道要写入哪个CommitLog之后,就可以将之前已经写入缓冲区buffer的消息数据写入到CommitLog了.
RocketMQ提供了两种方式进行写入:
(1) 通过暂存池将数据写入缓冲区 在开启暂存池时,会先将数据都写入字节缓冲区 ByteBuffer 中, ByteBuffer 在申请内存时,可以申请JVM堆内内存( HeapByteBuffer ),也可以申请堆外内存( DirectByteBuffer ),RocketMQ使用的是堆外内存 DirectByteBuffer .
暂存池 类似线程池,只不过池中存放的是提前申请好的内存( ByteBuffer ),RocketMQ会预先申请一些内存,从源码中可以看到申请的是堆外内存,然后放入池中,需要用时从池中获取,使用完毕后会归还到池中.
暂存池的开启条件 需要同时满足以下三个条件时才会开启暂存池:
SLAVE
; 从条件3中可以看出异步刷盘时才可以开启暂存池的使用,因为异步刷盘,很有可能是积攒了一批消息,需要同时刷入磁盘,所以使用暂存池可以将之前写入的消息先暂存在内存缓冲区中,等待执行刷盘时,将积攒的消息一起刷入磁盘中,而同步刷盘由于每次写入完毕之后要立刻刷回磁盘,那么就没有必要使用暂存池缓存数据了.
(2) 通过文件映射 未开启暂存池时使用文件映射,使用 MappedByteBuffer 映射对应的CommitLog文件, MappedByteBuffer 是ByteBuffer的子类,它可以将磁盘的文件内容映射到虚拟地址空间,通过虚拟地址访问物理内存中映射的文件内容,对文件内容进行操作。 使用 MappedByteBuffer 可以减少数据的拷贝,详细内容可参考 【Java】Java中的零拷贝 .
消息写入流程 。
了解了写入方式之后,来看下消息的写入流程:
CommitLog对应的 MappedFile 对象中记录了当前文件的写入位置,首先会判断准备写入的位置是否小于文件总大小,如果小于意味着当前文件可以进行内容写入,反之说明此文件已写满,不能继续下一步,需要返回错误信息; 。
判断是否开启暂存池,开启暂存池时使用 MappedFile 中的 ByteBuffer 来开辟共享内存,否则使用 MappedFile 中的; MappedByteBuffer 来开辟.
开辟共享内存之后,往共享内存中写入的数据,会影响到开辟它那个 ByteBuffer 或者 MappedByteBuffer 中; 。
将之前已经写入缓冲区的消息数据写入到开辟的共享内存中; 。
返回消息写入结果,有以下几种状态:
需要注意,此时消息驻留在操作系统的PAGECACHE中,接下来需要根据刷盘策略决定何时将内容刷入到硬盘中.
RocketMQ消息存储相关源码可参考: 【RocketMQ】【源码】消息的存储 。
在以上的消息写入步骤完成之后,会进行刷盘操作.
有两种刷盘策略:
同步刷盘 :表示消息写入到内存之后需要立刻刷到磁盘文件中.
异步刷盘 :表示消息写入内存成功之后就返回,由MQ定时将数据刷入到磁盘中,会有一定的数据丢失风险.
不管同步刷盘还是异步刷盘,都是唤醒对应的刷盘线程来进行,这里不对唤醒的具体过程进行讲解,如果想了解可参考 【RocketMQ】【源码】消息的刷盘机制 .
前面讲到,暂存池只有在异步刷盘时才可以启用,所以设置为同步刷盘时,使用的是文件映射的方式写入数据,在同步刷盘时直接通过 MappedByteBuffer 的 force 方法将数据flush到磁盘文件即可.
异步刷盘有开启暂存池和未开启两种情况.
开启暂存池时,可以分为Commit和Flush两个阶段.
(1)Commit阶段 。
在开启暂存池时,数据会先写入缓冲区 ByteBuffer 中,并未映射到CommitLog文件中,所以首先会唤醒Commit线程,将 ByteBuffer 中的数据写入到CommitLog对应的 FileChannel 中.
(2)Flush阶段 。
数据被写入 FileChannel 之后,就会唤醒Flush线程,再调用 FileChannel 的force方法将数据flush到磁盘.
未开启暂存池时使用文件映射的方式,直接唤醒Flush线程,调用 MappedByteBuffer 的 force 方法将数据flush到磁盘文件即可.
通过上面分析消息的持久化过程,来看下RocketMQ提升性能的一些地方.
(1)RocketMQ在写入数据到CommitLog时, 采用的是顺序写的方式 ,顺序写比随机写文件效率要高很多.
(2)在异步刷盘时,可以使用暂存池,暂存池会提前申请好内存,申请内存是一个比较重的操作,所以避免在消息写入时申请内存,以此提高效率.
(3)RocketMQ 使用了 MappedByteBuffer 文件映射的方式,向CommitLog写入数据,可以减少数据的拷贝过程.
参考 。
RocketMQ官方文档 。
郭慕荣-RocketMQ消息存储原理总结(一) 。
最后此篇关于【RocketMQ】消息的存储总结的文章就讲到这里了,如果你想了解更多关于【RocketMQ】消息的存储总结的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我一直在读到,如果一个集合“被释放”,它也会释放它的所有对象。另一方面,我还读到,一旦集合被释放,集合就会释放它的对象。 但最后一件事可能并不总是发生,正如苹果所说。系统决定是否取消分配。在大多数情况
我有一个客户端-服务器应用程序,它使用 WCF 进行通信,并使用 NetDataContractSerializer 序列化对象图。 由于服务器和客户端之间传输了大量数据,因此我尝试通过微调数据成员的
我需要有关 JMS 队列和消息处理的帮助。 我有一个场景,需要针对特定属性组同步处理消息,但可以在不同属性组之间同时处理消息。 我了解了特定于每个属性的消息组和队列的一些知识。我的想法是,我想针对
我最近开始使用 C++,并且有一种强烈的冲动 #define print(msg) std::cout void print(T const& msg) { std::cout void
我已经为使用 JGroups 编写了简单的测试。有两个像这样的简单应用程序 import org.jgroups.*; import org.jgroups.conf.ConfiguratorFact
这个问题在这里已经有了答案: Firebase messaging is not supported in your browser how to solve this? (3 个回答) 7 个月前关
在我的 C# 控制台应用程序中,我正在尝试更新 CRM 2016 中的帐户。IsFaulted 不断返回 true。当我向下钻取时它返回的错误消息如下: EntityState must be set
我正在尝试通过 tcp 将以下 json 写入 graylog 服务器: {"facility":"GELF","file":"","full_message":"Test Message Tcp",
我正在使用 Django 的消息框架来指示成功的操作和失败的操作。 如何排除帐户登录和注销消息?目前,登录后登陆页面显示 已成功登录为“用户名”。我不希望显示此消息,但应显示所有其他成功消息。我的尝试
我通过编写禁用qDebug()消息 CONFIG(release, debug|release):DEFINES += QT_NO_DEBUG_OUTPUT 在.pro文件中。这很好。我想知道是否可以
我正在使用 ThrottleRequest 来限制登录尝试。 在 Kendler.php 我有 'throttle' => \Illuminate\Routing\Middleware\Throttl
我有一个脚本,它通过die引发异常。捕获异常时,我想输出不附加位置信息的消息。 该脚本: #! /usr/bin/perl -w use strict; eval { die "My erro
允许的消息类型有哪些(字符串、字节、整数等)? 消息的最大大小是多少? 队列和交换器的最大数量是多少? 最佳答案 理论上任何东西都可以作为消息存储/发送。实际上您不想在队列上存储任何内容。如果队列大部
基本上,我正在尝试创建一个简单的 GUI 来与 Robocopy 一起使用。我正在使用进程打开 Robocopy 并将输出重定向到文本框,如下所示: With MyProcess.StartI
我想将进入 MQ 队列的消息记录到数据库/文件或其他日志队列,并且我无法修改现有代码。是否有任何方法可以实现某种类似于 HTTP 嗅探器的消息记录实用程序?或者也许 MQ 有一些内置的功能来记录消息?
我得到了一个带有 single_selection 数据表和一个命令按钮的页面。命令按钮调用一个 bean 方法来验证是否进行了选择。如果不是,它应该显示一条消息警告用户。如果进行了选择,它将导航到另
我知道 MSVC 可以通过 pragma 消息做到这一点 -> http://support.microsoft.com/kb/155196 gcc 是否有办法打印用户创建的警告或消息? (我找不到谷
当存在大量节点或二进制数据时, native Erlang 消息能否提供合理的性能? 情况 1:有一个大约 50-200 台机器的动态池(erlang 节点)。它在不断变化,每 10 分钟大约添加或删
我想知道如何在用户登录后显示“欢迎用户,您已登录”的问候消息,并且该消息应在 5 秒内消失。 该消息将在用户成功登录后显示一次,但在同一 session 期间连续访问主页时不会再次显示。因为我在 ho
如果我仅使用Welcome消息,我的代码可以正常工作,但是当打印p->client_name指针时,消息不居中。 所以我的问题是如何将消息和客户端名称居中,就像它是一条消息一样。为什么它目前仅将消
我是一名优秀的程序员,十分优秀!