- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
提到主从复制,我们可能立马会联想到 MySQL 的主从复制.
MySQL 主从复制是 MySQL 高可用机制之一,数据可以从数据库服务器主节点复制到一个或多个从节点.
这篇文章,我们聊聊 RocketMQ 的主从复制,希望你读完之后,能够理解主从复制的精髓.
在 RocketMQ 的集群模式中,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master.
每个 Broker 与 Name Server 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 Name Server.
Master 节点负责接收客户端的写入请求,并将消息持久化到磁盘上。而 Slave 节点则负责从 Master 节点复制消息数据,并保持与 Master 节点的同步.
生产者发送消息后,Master 接收到存储消息请求,将消息数据同步给 Slave 后,才将存储结果返回给生产者。同步复制模式下,发送消息会有一定延迟,系统吞吐量也会降低.
生产者发送消息后,Master 接收到存储消息请求,将消息存储后,直接将存储结果返回给生产者。 Master 和 Slave 再通过异步的方式同步数据,这种复制模式具有较小的延迟,可以实现比较高的吞吐量.
若 Master 出现故障,有些数据可能未写入 Slave ,未同步的数据可能丢失.
复制流程分为两个部分: 元数据复制 和 消息数据复制 .
Slave Broker 定时任务每隔 10 秒会同步元数据,包括 主题 , 消费进度 , 延迟消费进度 , 消费者配置 .
同步主题时, Slave Broker 向 Master Broker 发送 RPC 请求,返回数据后,首先加入本地缓存里,然后持久化到本地.
下图是 Master 和 Slave 消息数据同步的流程图.
1、Master 启动后监听指定端口; 。
Master 启动后创建 AcceptSocketService 服务 , 用来创建客户端到服务端的 TCP 链接.
RocketMQ 抽象了链接对象 HAConnection , HAConnection 会启动两个线程,分别用于读服务和写服务:
2、Slave 启动后,尝试连接 Master ,建立 TCP 连接; 。
HAClient 是客户端 Slave 的核心类 ,负责和 Master 创建连接和数据交互.
客户端在启动后,首先尝试连接 Master , 查询当前消息存储中最大的物理偏移量 ,并存储在变量 currentReportedOffset 里.
3、Slave 判定拉取间隔是否大于 5 秒,则向 Master 汇报已拉取消息偏移量; 。
上报进度的数据格式是一个 Long 类型的 Offset , 8个字节 , 非常简洁 .
发送到 Socket 缓冲区后 , 修改最后一次的写时间 lastWriteTimestamp .
4、Master 解析请求偏移量,从消息文件中检索该偏移量后的所有消息; 。
当 Slave 上报数据到 Master 时, 触发 SelectionKey.OP_READ 事件 ,Master 将请求交由 ReadSocketService 服务处理:
当 Slave Broker 传递了自身 commitlog 的 maxPhyOffset 时,Master 会马上中断 selector.select(1000) ,执行 processReadEvent 方法.
processReadEvent 方法的核心逻辑是设置 Slave 的当前进度 offset ,然后通知复制线程当前的复制进度.
写服务 WriteSocketService 从消息文件中检索该偏移量后的所有消息,并将消息数据发送给 Slave.
5、Slave 接收到数据,将消息数据 append 到消息文件 commitlog 里 .
首先 HAClient 类中调用 dispatchReadRequest 方法 , 解析出消息数据 ; 。
然后将消息数据 append 到本地的消息存储.
从数据复制流程图,我们发觉数据复制本身就是一个异步执行的,但是同步是如何实现的呢?
Master Broker 接收到写入消息的请求后 ,调用 Commitlog 的 aysncPutMessage 方法写入消息.
这段代码中,当 commitLog 执行完 appendMessage 后, 需要执行 刷盘任务 和 同步复制 两个任务.
但这两个任务并不是同步执行,而是异步的方式, 使用了 CompletableFuture 这个异步神器 .
当 HAConnection 读服务接收到 Slave 的进度反馈,发现消息数据复制成功,则唤醒 future .
最后 Broker 组装响应命令 ,并将响应命令返回给客户端.
1、主从复制包含元数据复制和消息数据复制两个部分; 。
2、元数据复制 。
Slave Broker 定时任务每隔 10 秒向 Master Broker 发送 RPC 请求,将元数据同步到缓存后,然后持久化到磁盘里; 。
3、消息数据复制 。
4、同步的实现 。
当 commitLog 执行完 appendMessage 后, 需要执行 刷盘任务 和 同步复制 两个任务,这里用到了 CompletableFuture 这个异步神器.
当 HAConnection 读服务接收到 Slave 的进度反馈,发现消息数据复制成功,则唤醒 future 。最后 Broker 组装响应命令 ,并将响应命令 返回给客户端 .
如果我的文章对你有所帮助,还请帮忙 点赞、在看、转发 一下,你的支持会激励我输出更高质量的文章,非常感谢! 。
最后此篇关于聊聊RocketMQ主从复制的文章就讲到这里了,如果你想了解更多关于聊聊RocketMQ主从复制的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在编写一个应用程序,允许用户创建一个“问卷”,然后向其中添加问题。我正在使用核心数据来存储信息。我创建了一个问卷实体,并与问题实体建立了“一对多”关系。我的问题是,如果要允许用户复制(复制)整个调
有没有办法复制或复制 SharedPreference?或者我需要从一个变量中获取每个变量,然后将它们放入另一个变量中吗? 最佳答案 尝试这样的事情: //sp1 is the shared pref
下面的(A)和(B)有区别吗? (假设 NON ARC,如果重要的话) // --- (A) --- @interface Zoo : NSObject{} @property (copy) Dog
我正在尝试将 mysql SELECT 查询保存到文件中,如下所示: $result = mysqli_query($db,$sql); $out = fopen('tmp/csv.csv', 'w'
我需要创建一个 CVPixelBufferRef 的副本,以便能够使用副本中的值以按位方式操作原始像素缓冲区。我似乎无法使用 CVPixelBufferCreate 或 CVPixelBufferCr
我在 Source 文件夹中有一个 Active wave 录音 wave-file.wav。我需要使用新名称 wave-file-copy.wav 将此文件复制到 Destination 文件夹。
在使用 GNU Autotools 构建的项目中,我有一个脚本需要通过 make 修改以包含安装路径。这是一个小例子: configure.ac: AC_INIT(foobar, 1.0) AC_PR
我想将 SQL 的行复制到同一个表中。但是在我的表中,我有一个“文本”列。 使用此 SQL: CREATE TEMPORARY TABLE produit2 ENGINE=MEMORY SELECT
谁能给我解释一下 df2 = df1 df2 = df1.copy() df3 = df1.copy(deep=False) 我已经尝试了所有选项并执行了以下操作: df1 = pd.DataFram
Hazelcast 是否具有类似于 Ehcache 的复制? http://www.ehcache.org/generated/2.9.0/pdf/Ehcache_Replication_Guide.
我有以下拓扑。一个 Ubuntu 16.04。运行我的全局 MySQL 服务器的 Amazon AWS 上的实例。我想将此服务器用作许多本地主服务器(Windows 机器 MySQL 服务器)的从服务
使用 SQLyog,我正在测试表中是否设置了正确的值。我尝试过 SELECT type_service FROM service WHERE email='test@gmail.com' 因此,只输出
有人可以提供一些关于如何配置 ElasticSearch 进行复制的说明。我在 Windows 中运行 ES,并且了解如果我在同一台服务器上多次运行 bat 文件,则会启动一个单独的 ES 实例,并且
一 点睛 ThreadGroup 复制线程的两个方法。 public int enumerate(Thread list[]) // 会将 ThreadGroup 中的 active 线程全部复制到
一 点睛 ThreadGroup 复制线程组的两个方法。 public int enumerate(ThreadGroup list[]) // 相对于 enumerate(list,true) pu
官方documentation Cassandra 说: Configure the keyspace and create the new datacenter: Use ALTER KEYSPAC
This question already has answers here: How to weight smoothing by arbitrary factor in ggplot2? (2个答
我们有一个表格来表明对各种俱乐部的兴趣。输出将数据记录在 Excel 电子表格中,其中列有他们的首选姓名、姓氏、电子邮件、代词,以及他们感兴趣的俱乐部的相应列中的“1”(下面的模型)。 我们希望为俱乐
This question already has answers here: Closed 8 years ago. Possible Duplicate: In vim, how do I get
如何复制形状及其所在的单元格?当我手动复制时,形状会跟随单元格,但是当我使用宏进行复制时,我会得到除形状之外的所有其他内容。 Cells(sourceRow, sourceColumn).Copy C
我是一名优秀的程序员,十分优秀!