- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
主从同步的实现逻辑主要在 HAService 中,在 DefaultMessageStore 的构造函数中,对 HAService 进行了实例化,并在start方法中,启动了 HAService :
public class DefaultMessageStore implements MessageStore {
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
// ...
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
// 初始化HAService
this.haService = new HAService(this);
} else {
this.haService = null;
}
// ...
}
public void start() throws Exception {
// ...
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
// 启动HAService
this.haService.start();
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
// ...
}
}
在 HAService 的构造函数中,创建了 AcceptSocketService 、 GroupTransferService 和 HAClient ,在start方法中主要做了如下几件事:
AcceptSocketService
的beginAccept方法,这一步 主要是进行端口绑定,在端口上监听从节点的连接请求 (可以看做是运行在master节点的); AcceptSocketService
的start方法启动服务,这一步 主要为了处理从节点的连接请求,与从节点建立连接 (可以看做是运行在master节点的); GroupTransferService
的start方法, 主要用于在主从同步的时候,等待数据传输完毕 (可以看做是运行在master节点的); HAClient
的start方法启动, 里面与master节点建立连接,向master汇报主从同步进度并存储master发送过来的同步数据 (可以看做是运行在从节点的);
public class HAService {
public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
this.defaultMessageStore = defaultMessageStore;
// 创建AcceptSocketService
this.acceptSocketService =
new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
this.groupTransferService = new GroupTransferService();
// 创建HAClient
this.haClient = new HAClient();
}
public void start() throws Exception {
// 开始监听从服务器的连接
this.acceptSocketService.beginAccept();
// 启动服务
this.acceptSocketService.start();
// 启动GroupTransferService
this.groupTransferService.start();
// 启动
this.haClient.start();
}
}
AcceptSocketService 的 beginAccept 方法里面首先获取了 ServerSocketChannel ,然后进行端口绑定,并在selector上面注册了OP_ACCEPT事件的监听,监听从节点的连接请求:
public class HAService {
class AcceptSocketService extends ServiceThread {
/**
* 监听从节点的连接
*
* @throws Exception If fails.
*/
public void beginAccept() throws Exception {
// 创建ServerSocketChannel
this.serverSocketChannel = ServerSocketChannel.open();
// 获取selector
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
// 绑定端口
this.serverSocketChannel.socket().bind(this.socketAddressListen);
// 设置非阻塞
this.serverSocketChannel.configureBlocking(false);
// 注册OP_ACCEPT连接事件的监听
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
}
}
AcceptSocketService 的run方法中,对监听到的连接请求进行了处理,处理逻辑大致如下:
OP_ACCEPT
连接事件,创建与从节点的连接对象 HAConnection
,与从节点建立连接,然后调用 HAConnection
的start方法进行启动,并创建的 HAConnection
对象加入到连接集合中, HAConnection中封装了Master节点和从节点的数据同步逻辑 ;
public class HAService {
class AcceptSocketService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
// 如果服务未停止
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 获取监听到的事件
Set<SelectionKey> selected = this.selector.selectedKeys();
// 处理事件
if (selected != null) {
for (SelectionKey k : selected) {
// 如果是连接事件
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
// 创建HAConnection,建立连接
HAConnection conn = new HAConnection(HAService.this, sc);
// 启动
conn.start();
// 添加连接
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
}
}
GroupTransferService 的run方法主要是为了在进行主从数据同步的时候,等待从节点数据同步完毕.
在运行时首先进会调用 waitForRunning 进行等待,因为此时可能还有没有开始主从同步,所以先进行等待,之后如果有同步请求,会唤醒该线程,然后调用 doWaitTransfer 方法等待数据同步完成:
public class HAService {
class GroupTransferService extends ServiceThread {
public void run() {
log.info(this.getServiceName() + " service started");
// 如果服务未停止
while (!this.isStopped()) {
try {
// 等待运行
this.waitForRunning(10);
// 如果被唤醒,调用doWaitTransfer等待主从同步完成
this.doWaitTransfer();
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
}
}
在看 doWaitTransfer 方法之前,首先看下是如何判断有数据需要同步的.
Master节点中,当消息被写入到CommitLog以后,会调用 submitReplicaRequest 方法处主从同步,首先判断当前Broker的角色是否是SYNC_MASTER,如果是则会构建消息提交请求 GroupCommitRequest ,然后调用 HAService 的 putRequest 添加到请求集合中,并唤醒 GroupTransferService 中在等待的线程:
public class CommitLog {
public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
// 构建GroupCommitRequest
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
// 添加请求
service.putRequest(request);
// 唤醒GroupTransferService中在等待的线程
service.getWaitNotifyObject().wakeupAll();
return request.future();
}
else {
return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
在 doWaitTransfer 方法中,会判断CommitLog提交请求集合 requestsRead 是否为空,如果不为空,表示有消息写入了CommitLog,Master节点需要等待将数据传输给从节点:
public class HAService {
// CommitLog提交请求集合
private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>();
class GroupTransferService extends ServiceThread {
private void doWaitTransfer() {
// 如果CommitLog提交请求集合不为空
if (!this.requestsRead.isEmpty()) {
// 处理消息提交请求
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
// 判断传输到从节点最大偏移量是否超过了请求中设置的偏移量
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
// 获取截止时间
long deadLine = req.getDeadLine();
// 如果从节点还未同步完毕并且未超过截止时间
while (!transferOK && deadLine - System.nanoTime() > 0) {
// 等待
this.notifyTransferObject.waitForRunning(1000);
// 判断从节点同步的最大偏移量是否超过了请求中设置的偏移量
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
// 唤醒
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
this.requestsRead = new LinkedList<>();
}
}
}
}
HAClient可以看做是在从节点上运行的,主要进行的处理如下:
connectMaster
方法连接Master节点,Master节点上也会运行,但是它本身就是Master没有可连的Master节点,所以可以忽略; isTimeToReportOffset
方法判断是否需要向Master节点汇报同步偏移量,如果需要则调用 reportSlaveMaxOffset
方法将当前的消息同步偏移量发送给Master节点; processReadEvent
处理网络请求中的可读事件, 也就是处理Master发送过来的消息,将消息存入CommitLog ;
public class HAService {
class HAClient extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 连接Master节点
if (this.connectMaster()) {
// 是否需要报告消息同步偏移量
if (this.isTimeToReportOffset()) {
// 向Master节点发送同步偏移量
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
this.selector.select(1000);
// 处理读事件,也就是Master节点发送的数据
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
// ...
} else {
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
}
}
connectMaster方法中会获取Master节点的地址,并转换为SocketAddress对象,然后向Master节点请求建立连接,并在selector注册OP_READ可读事件监听:
public class HAService {
class HAClient extends ServiceThread {
// 当前的主从复制进度
private long currentReportedOffset = 0;
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
if (addr != null) {
// 将地址转为SocketAddress
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
// 连接master
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
// 注册OP_READ可读事件监听
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
// 获取CommitLog中当前最大的偏移量
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// 更新上次写入时间
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
}
在 isTimeToReportOffset 方法中,首先获取当前时间与上一次进行主从同步的时间间隔interval,如果时间间隔interval大于配置的发送心跳时间间隔,表示需要向Master节点发送从节点消息同步的偏移量,接下来会调用 reportSlaveMaxOffset 方法发送同步偏移量, 也就是说从节点会定时向Master节点发送请求,反馈CommitLog中同步消息的偏移量 :
public class HAService {
class HAClient extends ServiceThread {
// 当前从节点已经同步消息的偏移量大小
private long currentReportedOffset = 0;
private boolean isTimeToReportOffset() {
// 获取距离上一次主从同步的间隔时间
long interval =
HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
// 判断是否超过了配置的发送心跳包时间间隔
boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
.getHaSendHeartbeatInterval();
return needHeart;
}
// 发送同步偏移量,传入的参数是当前的主从复制偏移量currentReportedOffset
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8); // 设置数据传输大小为8个字节
this.reportOffset.putLong(maxOffset);// 设置同步偏移量
this.reportOffset.position(0);
this.reportOffset.limit(8);
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
// 向Master节点发送拉取偏移量
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
// 更新发送时间
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
return !this.reportOffset.hasRemaining();
}
}
}
processReadEvent 方法中处理了可读事件,也就是处理Master节点发送的同步数据, 首先从socketChannel中读取数据到byteBufferRead中,byteBufferRead是读缓冲区,读取数据的方法会返回读取到的字节数,对字节数大小进行判断:
dispatchReadRequest
方法进行处理;
class HAClient extends ServiceThread {
// 读缓冲区,会将从socketChannel读入缓冲区
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
// 从socketChannel中读取数据到byteBufferRead中,返回读取到的字节数
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
// 重置readSizeZeroTimes
readSizeZeroTimes = 0;
// 处理数据
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
// 记录读取到空数据的次数
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
}
dispatchReadRequest 方法中会将从节点读取到的数据写入CommitLog, dispatchPosition 记录了已经处理的数据在读缓冲区中的位置,从读缓冲区 byteBufferRead 获取剩余可读取的字节数,如果可读数据的字节数大于一个消息头的字节数(12个字节),表示有数据还未处理完毕,反之表示消息已经处理完毕结束处理。 对数据的处理逻辑如下:
class HAClient extends ServiceThread {
// 已经处理的数据在读缓冲区中的位置,初始化为0
private int dispatchPosition = 0;
// 读缓冲区
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private boolean dispatchReadRequest() {
// 消息头大小
final int msgHeaderSize = 8 + 4; // phyoffset + size
// 开启循环不断读取数据
while (true) {
// 获可读取的字节数
int diff = this.byteBufferRead.position() - this.dispatchPosition;
// 如果字节数大于一个消息头的字节数
if (diff >= msgHeaderSize) {
// 获取消息在master节点的物理偏移量
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
// 获取消息体大小
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
// 获取从节点当前CommitLog的最大物理偏移量
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (slavePhyOffset != 0) {
// 如果不一致结束处理
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
// 如果可读取的字节数大于一个消息头的字节数 + 消息体大小
if (diff >= (msgHeaderSize + bodySize)) {
// 将度缓冲区的数据转为字节数组
byte[] bodyData = byteBufferRead.array();
// 计算消息体在读缓冲区中的起始位置
int dataStart = this.dispatchPosition + msgHeaderSize;
// 从读缓冲区中根据消息的位置,读取消息内容,将消息追加到从节点的CommitLog中
HAService.this.defaultMessageStore.appendToCommitLog(
masterPhyOffset, bodyData, dataStart, bodySize);
// 更新dispatchPosition的值为消息头大小+消息体大小
this.dispatchPosition += msgHeaderSize + bodySize;
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
continue;
}
}
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}
}
HAConnection中封装了Master节点与从节点的网络通信处理,分别在 ReadSocketService 和 WriteSocketService 中.
ReadSocketService 启动后处理监听到的可读事件,前面知道HAClient中从节点会定时向Master节点汇报从节点的消息同步偏移量,Master节点对汇报请求的处理就在这里,如果从网络中监听到了可读事件,会调用 processReadEvent 处理读事件:
public class HAConnection {
class ReadSocketService extends ServiceThread {
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 处理可读事件
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
// ...
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
// ...
HAConnection.log.info(this.getServiceName() + " service end");
}
}
}
processReadEvent 中从网络中处理读事件的方式与上面 HAClient 的 dispatchReadRequest 类似,都是将网络中的数据读取到读缓冲区中,并用一个变量记录已读取数据的位置, processReadEvent 方法的处理逻辑如下:
public class HAConnection {
class ReadSocketService extends ServiceThread {
// 读缓冲区
private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
// 读缓冲区中已经处理的数据位置
private int processPosition = 0;
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
// 如果没有可读数据
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
// 处理位置置为0
this.processPosition = 0;
}
// 如果数据未读取完毕
while (this.byteBufferRead.hasRemaining()) {
try {
// 从socketChannel读取数据到byteBufferRead中,返回读取到的字节数
int readSize = this.socketChannel.read(this.byteBufferRead);
// 如果读取数据字节数大于0
if (readSize > 0) {
// 重置readSizeZeroTimes
readSizeZeroTimes = 0;
// 获取上次处理读事件的时间戳
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
// 判断剩余可读取的字节数是否大于等于8
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
// 获取偏移量内容的结束位置
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
// 从结束位置向前读取8个字节得到从点发送的同步偏移量
long readOffset = this.byteBufferRead.getLong(pos - 8);
// 更新处理位置
this.processPosition = pos;
// 更新slaveAckOffset为从节点发送的同步进度
HAConnection.this.slaveAckOffset = readOffset;
// 如果记录的从节点的同步进度小于0,表示还未进行同步
if (HAConnection.this.slaveRequestOffset < 0) {
// 更新为从节点发送的同步进度
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
} else if (HAConnection.this.slaveAckOffset > HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()) {
// 如果从节点发送的拉取偏移量比当前Master节点的最大物理偏移量还要大
log.warn("slave[{}] request offset={} greater than local commitLog offset={}. ",
HAConnection.this.clientAddr,
HAConnection.this.slaveAckOffset,
HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset());
return false;
}
// 更新Master节点记录的向从节点同步消息的偏移量
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0)
// 判断连续读取到空数据的次数是否超过三次
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
}
}
前面在GroupTransferService中可以看到是通过push2SlaveMaxOffset的值判断本次同步是否完成的,在notifyTransferSome方法中可以看到当Master节点收到从节点反馈的消息拉取偏移量时,对push2SlaveMaxOffset的值进行了更新:
public class HAService {
// 向从节点推送的消息最大偏移量
private final GroupTransferService groupTransferService;
public void notifyTransferSome(final long offset) {
// 如果传入的偏移大于push2SlaveMaxOffset记录的值,进行更新
for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
// 更新向从节点推送的消息最大偏移量
boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
if (ok) {
this.groupTransferService.notifyTransferSome();
break;
} else {
value = this.push2SlaveMaxOffset.get();
}
}
}
}
WriteSocketService 用于Master节点向从节点发送同步消息,处理逻辑如下:
根据从节点发送的主从同步消息拉取偏移量 slaveRequestOffset 进行判断:
slaveRequestOffset
值为-1,表示还未收到从节点报告的同步偏移量,此时睡眠一段时间等待从节点发送消息拉取偏移量; slaveRequestOffset
值不为-1,表示已经开始进行主从同步进行下一步; 判断 nextTransferFromWhere 值是否为-1, nextTransferFromWhere记录了下次需要传输的消息在CommitLog中的偏移量 ,如果值为-1表示初次进行数据同步,此时有两种情况:
判断上次写事件是否已经将数据都写入到从节点 。
根据nextTransferFromWhere从CommitLog中获取消息,如果未获取到消息,等待100ms,如果获取到消息,从CommitLog中获取消息进行传输: (1)如果获取到消息的字节数大于最大传输的大小,设置最最大传输数量,分批进行传输; (2)更新下次传输的偏移量地址也就是nextTransferFromWhere的值; (3)从CommitLog中获取的消息内容设置到将读取到的消息数据设置到selectMappedBufferResult中; (4)设置消息头信息,包括消息头字节数、拉取消息的偏移量等; (5)调用transferData发送数据; 。
public class HAConnection {
class WriteSocketService extends ServiceThread {
private final int headerSize = 8 + 4;// 消息头大小
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 如果slaveRequestOffset为-1,表示还未收到从节点报告的拉取进度
if (-1 == HAConnection.this.slaveRequestOffset) {
// 等待一段时间
Thread.sleep(10);
continue;
}
// 初次进行数据同步
if (-1 == this.nextTransferFromWhere) {
// 如果拉取进度为0
if (0 == HAConnection.this.slaveRequestOffset) {
// 从master节点最大偏移量从开始传输
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMappedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
// 更新nextTransferFromWhere
this.nextTransferFromWhere = masterOffset;
} else {
// 根据从节点发送的偏移量开始数据同步
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
// 判断上次传输是否完毕
if (this.lastWriteOver) {
// 获取当前时间距离上次写入数据的时间间隔
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
// 如果距离上次写入数据的时间间隔超过了设置的心跳时间
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// 构建header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
// 发送心跳包
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
} else {
// 未传输完毕,继续上次的传输
this.lastWriteOver = this.transferData();
// 如果依旧未完成,结束本次处理
if (!this.lastWriteOver)
continue;
}
// 根据偏移量获取消息数据
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {// 获取消息不为空
// 获取消息内容大小
int size = selectResult.getSize();
// 如果消息的字节数大于最大传输的大小
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
// 设置为最大传输大小
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
// 更新下次传输的偏移量地址
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
// 将读取到的消息数据设置到selectMappedBufferResult
this.selectMappedBufferResult = selectResult;
// 设置消息头
this.byteBufferHeader.position(0);
// 设置消息头大小
this.byteBufferHeader.limit(headerSize);
// 设置偏移量地址
this.byteBufferHeader.putLong(thisOffset);
// 设置消息内容大小
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
// 发送数据
this.lastWriteOver = this.transferData();
} else {
// 等待100ms
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();
// ...
HAConnection.log.info(this.getServiceName() + " service end");
}
}
}
transferData 方法的处理逻辑如下:
public class HAConnection {
class WriteSocketService extends ServiceThread {
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// 写入消息头
while (this.byteBufferHeader.hasRemaining()) {
// 发送消息头数据
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
// 记录发送时间
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write header error < 0");
}
}
if (null == this.selectMappedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}
writeSizeZeroTimes = 0;
// 消息头数据发送完毕之后,发送消息内容
if (!this.byteBufferHeader.hasRemaining()) {
while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
// 发送消息内容
int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write body error < 0");
}
}
}
// ...
return result;
}
}
}
总结 。
主从同步流程 。
有新消息写入之后的同步流程 。
参考 丁威、周继锋《RocketMQ技术内幕》 。
RocketMQ版本:4.9.3 。
最后此篇关于【RocketMQ】主从同步实现原理的文章就讲到这里了,如果你想了解更多关于【RocketMQ】主从同步实现原理的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
什么是RocketMQ RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。主要功能是异步解耦和流量削峰:。 常见的MQ主要有
1、说说你们公司线上生产环境用的是什么消息中间件? 见【2、多个mq如何选型?】 2、多个mq如何选型? MQ 描述 Rabb
消费者从Broker拉取到消息之后,会将消息提交到线程池中进行消费,RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求 ConsumeReques
全局有序 在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一
RocketMQ支持集群部署来保证高可用。它基于主从模式,将节点分为Master、Slave两个角色,集群中可以有多个Master节点,一个Master节点可以有多个Slave节点。Master节点
RocketMQ 4.5版本之前,可以采用主从架构进行集群部署,但是如果master节点挂掉,不能自动在集群中选举出新的Master节点,需要人工介入,在4.5版本之后提供了DLedger模式,使用
RocketMQ是通过 DefaultMQProducer 进行消息发送的,它实现了 MQProducer 接口, MQProducer 接口中定义了消息发送的方法,方法主要分为三大类:
当Broker收到生产者的消息发送请求时,会对请求进行处理,从请求中解析发送的消息数据,接下来以单个消息的接收为例,看一下消息的接收过程。 数据校验 封装消息 首先Broker会创
在上一讲中,介绍了消息的存储,生产者向Broker发送消息之后,数据会写入到CommitLog中,这一讲,就来看一下消费者是如何从Broker拉取消息的。 RocketMQ消息的消费以组为单位
NameServer是一个注册中心,提供服务注册和服务发现的功能。NameServer可以集群部署,集群中每个节点都是对等的关系(没有像ZooKeeper那样在集群中选举出一个Master节点),节
RocketMQ 4.5版本之前,可以采用主从架构进行集群部署,但是如果master节点挂掉,不能自动在集群中选举出新的Master节点,需要人工介入,在4.5版本之后提供了DLedger模式,使用
消息存储 在 【RocketMQ】消息的存储 一文中提到,Broker收到消息后会调用 CommitLog 的asyncPutMessage方法写入消息,在DLedger模式下使用的是
RocketMQ在集群模式下,同一个消费组内,一个消息队列同一时间只能分配给组内的某一个消费者,也就是一条消息只能被组内的一个消费者进行消费,为了合理的对消息队列进行分配,于是就有了负载均衡.
RocketMQ有两种获取消息的方式,分别为推模式和拉模式。 推模式 推模式在 【RocketMQ】消息的拉取 一文中已经讲过,虽然从名字上看起来是消息到达Broker后推送给消费者
主从同步的实现逻辑主要在 HAService 中,在 DefaultMessageStore 的构造函数中,对 HAService 进行了实例化,并在start方法中,启动了 HAService :
在 【RocketMQ】消息的拉取 一文中可知,消费者在启动的时候,会创建消息拉取API对象 PullAPIWrapper ,调用pullKernelImpl方法向Broker发送拉取消息的
全局有序 在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一
概述 RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息; 预设值的延迟时间间隔为: 1s、 5s、 10s、 30s、 1m
RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等。 其中,level=0 级表示不延时,level=1 表示 1 级延时,le
我尝试给 rockerMQ broker 加注星标,但我收到了错误消息: There is insufficient memory for the Java Runtime Environment t
我是一名优秀的程序员,十分优秀!