- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
在上一篇博客中MyRaft实现了日志复制功能,按照计划接下来需要实现日志压缩.
我们知道raft协议是基于日志复制的协议,日志数据是raft的核心。但随着raft集群的持续工作,raft的日志文件将会维护越来越多的日志,而这会带来以下几个问题.
考虑到绝大多数的状态机中存储的数据并不都是新增,而更多的是对已有数据的更新,则状态机中所存储的数据量通常会远小于raft日志的总大小。例如K/V数据库,对相同key的N次操作(整体更新操作),只有最后一次操作是实际有效的,而在此之前的针对该key的raft日志其实已经没有保存的必要了。 因此raft的作者在论文的日志压缩一节中提到了几种日志压缩的算法(基于快照的、基于LSM树的),raft选择了更容易理解和实现的、基于状态机快照的算法作为日志压缩的基础.
raft日志压缩实现中有以下几个关键点:
下面开始结合源码分析MyRaft的日志压缩功能 。
/**
* raft快照对象
* */
public class RaftSnapshot {
/**
* 快照所包含的最后一条log的索引编号
* */
private long lastIncludedIndex;
/**
* 快照所包含的最后一条log的任期编号
* */
private int lastIncludedTerm;
/**
* 快照数据
* (注意:暂不考虑快照过大导致byte数组放不下的情况)
* */
private byte[] snapshotData = new byte[0];
}
public class SnapshotModule {
private static final Logger logger = LoggerFactory.getLogger(SnapshotModule.class);
private final RaftServer currentServer;
private final File snapshotFile;
private final ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.WriteLock writeLock = reentrantLock.writeLock();
private final ReentrantReadWriteLock.ReadLock readLock = reentrantLock.readLock();
private static final String snapshotFileName = "snapshot.txt";
private static final String snapshotTempFileName = "snapshot-temp.txt";
/**
* 存放快照实际数据的偏移量(lastIncludedIndex + lastIncludedTerm 共两个字段后存放快照)
* */
private static final int actualDataOffset = 4 + 8;
public SnapshotModule(RaftServer currentServer) {
this.currentServer = currentServer;
// 保证目录是存在的
String snapshotFileDir = getSnapshotFileDir();
new File(snapshotFileDir).mkdirs();
snapshotFile = new File(snapshotFileDir + File.separator + snapshotFileName);
File snapshotTempFile = new File(snapshotFileDir + File.separator + snapshotTempFileName);
if(!snapshotFile.exists() && snapshotTempFile.exists()){
// 快照文件不存在,但是快照的临时文件存在。说明在写完临时文件并重命名之前宕机了(临时文件是最新的完整快照)
// 将tempFile重命名为快照文件
snapshotTempFile.renameTo(snapshotFile);
logger.info("snapshot-temp file rename to snapshot file success!");
}
}
/**
* 持久化一个新的快照文件
* (暂不考虑快照太大的问题)
* */
public void persistentNewSnapshotFile(RaftSnapshot raftSnapshot){
logger.info("do persistentNewSnapshotFile raftSnapshot={}",raftSnapshot);
writeLock.lock();
try {
String userPath = getSnapshotFileDir();
// 新的文件名是tempFile
String newSnapshotFilePath = userPath + File.separator + snapshotTempFileName;
logger.info("do persistentNewSnapshotFile newSnapshotFilePath={}", newSnapshotFilePath);
File snapshotTempFile = new File(newSnapshotFilePath);
try (RandomAccessFile newSnapshotFile = new RandomAccessFile(snapshotTempFile, "rw")) {
MyRaftFileUtil.createFile(snapshotTempFile);
newSnapshotFile.writeInt(raftSnapshot.getLastIncludedTerm());
newSnapshotFile.writeLong(raftSnapshot.getLastIncludedIndex());
newSnapshotFile.write(raftSnapshot.getSnapshotData());
logger.info("do persistentNewSnapshotFile success! raftSnapshot={}", raftSnapshot);
} catch (IOException e) {
throw new MyRaftException("persistentNewSnapshotFile error", e);
}
// 先删掉原来的快照文件,然后把临时文件重名名为快照文件(delete后、重命名前可能宕机,但是没关系,重启后构造方法里做了对应处理)
snapshotFile.delete();
snapshotTempFile.renameTo(snapshotFile);
}finally {
writeLock.unlock();
}
}
/**
* 安装快照
* */
public void appendInstallSnapshot(InstallSnapshotRpcParam installSnapshotRpcParam){
logger.info("do appendInstallSnapshot installSnapshotRpcParam={}",installSnapshotRpcParam);
writeLock.lock();
String userPath = getSnapshotFileDir();
// 新的文件名是tempFile
String newSnapshotFilePath = userPath + File.separator + snapshotTempFileName;
logger.info("do appendInstallSnapshot newSnapshotFilePath={}", newSnapshotFilePath);
File snapshotTempFile = new File(newSnapshotFilePath);
try (RandomAccessFile newSnapshotFile = new RandomAccessFile(snapshotTempFile, "rw")) {
MyRaftFileUtil.createFile(snapshotTempFile);
if(installSnapshotRpcParam.getOffset() == 0){
newSnapshotFile.setLength(0);
}
newSnapshotFile.seek(0);
newSnapshotFile.writeInt(installSnapshotRpcParam.getLastIncludedTerm());
newSnapshotFile.writeLong(installSnapshotRpcParam.getLastIncludedIndex());
// 文件指针偏移,找到实际应该写入快照数据的地方
newSnapshotFile.seek(actualDataOffset + installSnapshotRpcParam.getOffset());
// 写入快照数据
newSnapshotFile.write(installSnapshotRpcParam.getData());
logger.info("do appendInstallSnapshot success! installSnapshotRpcParam={}", installSnapshotRpcParam);
} catch (IOException e) {
throw new MyRaftException("appendInstallSnapshot error", e);
} finally {
writeLock.unlock();
}
if(installSnapshotRpcParam.isDone()) {
writeLock.lock();
try {
// 先删掉原来的快照文件,然后把临时文件重名名为快照文件(delete后、重命名前可能宕机,但是没关系,重启后构造方法里做了对应处理)
snapshotFile.delete();
snapshotTempFile.renameTo(snapshotFile);
} finally {
writeLock.unlock();
}
}
}
/**
* 没有实际快照数据,只有元数据
* */
public RaftSnapshot readSnapshotMetaData(){
if(this.snapshotFile.length() == 0){
return null;
}
readLock.lock();
try(RandomAccessFile latestSnapshotRaFile = new RandomAccessFile(this.snapshotFile, "r")) {
RaftSnapshot raftSnapshot = new RaftSnapshot();
raftSnapshot.setLastIncludedTerm(latestSnapshotRaFile.readInt());
raftSnapshot.setLastIncludedIndex(latestSnapshotRaFile.readLong());
return raftSnapshot;
} catch (IOException e) {
throw new MyRaftException("readSnapshotNoData error",e);
} finally {
readLock.unlock();
}
}
public RaftSnapshot readSnapshot(){
logger.info("do readSnapshot");
if(this.snapshotFile.length() == 0){
logger.info("snapshotFile is empty!");
return null;
}
readLock.lock();
try(RandomAccessFile latestSnapshotRaFile = new RandomAccessFile(this.snapshotFile, "r")) {
logger.info("do readSnapshot");
RaftSnapshot latestSnapshot = new RaftSnapshot();
latestSnapshot.setLastIncludedTerm(latestSnapshotRaFile.readInt());
latestSnapshot.setLastIncludedIndex(latestSnapshotRaFile.readLong());
// 读取snapshot的实际数据(简单起见,暂不考虑快照太大内存溢出的问题,可以优化为按照偏移量多次读取)
byte[] snapshotData = new byte[(int) (this.snapshotFile.length() - actualDataOffset)];
latestSnapshotRaFile.read(snapshotData);
latestSnapshot.setSnapshotData(snapshotData);
logger.info("readSnapshot success! readSnapshot={}",latestSnapshot);
return latestSnapshot;
} catch (IOException e) {
throw new MyRaftException("readSnapshot error",e);
} finally {
readLock.unlock();
}
}
private String getSnapshotFileDir(){
return System.getProperty("user.dir")
+ File.separator + currentServer.getServerId()
+ File.separator + "snapshot";
}
}
/**
* 构建快照的检查
* */
private void buildSnapshotCheck() {
try {
if(!readLock.tryLock(1,TimeUnit.SECONDS)){
logger.info("buildSnapshotCheck lock fail, quick return!");
return;
}
} catch (InterruptedException e) {
throw new MyRaftException("buildSnapshotCheck tryLock error!",e);
}
try {
long logFileLength = this.logFile.length();
long logFileThreshold = currentServer.getRaftConfig().getLogFileThreshold();
if (logFileLength < logFileThreshold) {
logger.info("logFileLength not reach threshold, do nothing. logFileLength={},threshold={}", logFileLength, logFileThreshold);
return;
}
logger.info("logFileLength already reach threshold, start buildSnapshot! logFileLength={},threshold={}", logFileLength, logFileThreshold);
byte[] snapshot = currentServer.getKvReplicationStateMachine().buildSnapshot();
LogEntry lastCommittedLogEntry = readLocalLog(this.lastCommittedIndex);
RaftSnapshot raftSnapshot = new RaftSnapshot();
raftSnapshot.setLastIncludedTerm(lastCommittedLogEntry.getLogTerm());
raftSnapshot.setLastIncludedIndex(lastCommittedLogEntry.getLogIndex());
raftSnapshot.setSnapshotData(snapshot);
// 持久化最新的一份快照
currentServer.getSnapshotModule().persistentNewSnapshotFile(raftSnapshot);
}finally {
readLock.unlock();
}
try {
buildNewLogFileRemoveCommittedLog();
} catch (IOException e) {
logger.error("buildNewLogFileRemoveCommittedLog error",e);
}
}
/**
* 构建一个删除了已提交日志的新日志文件(日志压缩到快照里了)
* */
private void buildNewLogFileRemoveCommittedLog() throws IOException {
long lastCommitted = getLastCommittedIndex();
long lastIndex = getLastIndex();
// 暂不考虑读取太多日志造成内存溢出的问题
List<LocalLogEntry> logEntryList;
if(lastCommitted == lastIndex){
// (lastCommitted == lastIndex) 所有日志都提交了,创建一个空的新日志文件
logEntryList = new ArrayList<>();
}else{
// 还有日志没提交,把没提交的记录到新的日志文件中
logEntryList = readLocalLog(lastCommitted+1,lastIndex);
}
File tempLogFile = new File(getLogFileDir() + File.separator + logTempFileName);
MyRaftFileUtil.createFile(tempLogFile);
try(RandomAccessFile randomAccessTempLogFile = new RandomAccessFile(tempLogFile,"rw")) {
long currentOffset = 0;
for (LogEntry logEntry : logEntryList) {
// 写入日志
writeLog(randomAccessTempLogFile, logEntry, currentOffset);
currentOffset = randomAccessTempLogFile.getFilePointer();
}
this.currentOffset = currentOffset;
}
File tempLogMeteDataFile = new File(getLogFileDir() + File.separator + logMetaDataTempFileName);
MyRaftFileUtil.createFile(tempLogMeteDataFile);
// 临时的日志元数据文件写入数据
refreshMetadata(tempLogMeteDataFile,currentOffset);
writeLock.lock();
try{
// 先删掉原来的日志文件,然后把临时文件重名名为日志文件(delete后、重命名前可能宕机,但是没关系,重启后构造方法里做了对应处理)
this.logFile.delete();
boolean renameLogFileResult = tempLogFile.renameTo(this.logFile);
if(!renameLogFileResult){
logger.error("renameLogFile error!");
}
// 先删掉原来的日志元数据文件,然后把临时文件重名名为日志元数据文件(delete后、重命名前可能宕机,但是没关系,重启后构造方法里做了对应处理)
this.logMetaDataFile.delete();
boolean renameTempLogMeteDataFileResult = tempLogMeteDataFile.renameTo(this.logMetaDataFile);
if(!renameTempLogMeteDataFileResult){
logger.error("renameTempLogMeteDataFile error!");
}
}finally {
writeLock.unlock();
}
}
相比lab2,在引入了快照压缩功能后,leader侧的日志复制逻辑需要进行一点小小的拓展。 即当要向follower同步某一条日志时,对应日志可能已经被压缩掉了,因此此时需要改为使用installSnapshotRpc来完成快照的安装.
/**
* leader向集群广播,令follower复制新的日志条目
* */
public List<AppendEntriesRpcResult> replicationLogEntry(LogEntry lastEntry) {
List<RaftService> otherNodeInCluster = currentServer.getOtherNodeInCluster();
List<Future<AppendEntriesRpcResult>> futureList = new ArrayList<>(otherNodeInCluster.size());
for(RaftService node : otherNodeInCluster){
// 并行发送rpc,要求follower复制日志
Future<AppendEntriesRpcResult> future = this.rpcThreadPool.submit(()->{
logger.info("replicationLogEntry start!");
long nextIndex = this.currentServer.getNextIndexMap().get(node);
AppendEntriesRpcResult finallyResult = null;
// If last log index ≥ nextIndex for a follower: send AppendEntries RPC with log entries starting at nextIndex
while(lastEntry.getLogIndex() >= nextIndex){
AppendEntriesRpcParam appendEntriesRpcParam = new AppendEntriesRpcParam();
appendEntriesRpcParam.setLeaderId(currentServer.getServerId());
appendEntriesRpcParam.setTerm(currentServer.getCurrentTerm());
appendEntriesRpcParam.setLeaderCommit(this.lastCommittedIndex);
int appendLogEntryBatchNum = this.currentServer.getRaftConfig().getAppendLogEntryBatchNum();
// 要发送的日志最大index值
// (追进度的时候,就是nextIndex开始批量发送appendLogEntryBatchNum-1条(左闭右闭区间);如果进度差不多那就是以lastEntry.index为界限全部发送出去)
long logIndexEnd = Math.min(nextIndex+(appendLogEntryBatchNum-1), lastEntry.getLogIndex());
// 读取出[nextIndex-1,logIndexEnd]的日志(左闭右闭区间),-1往前一位是为了读取出preLog的信息
List<LocalLogEntry> localLogEntryList = this.readLocalLog(nextIndex-1,logIndexEnd);
logger.info("replicationLogEntry doing! nextIndex={},logIndexEnd={},LocalLogEntryList={}",
nextIndex,logIndexEnd,JsonUtil.obj2Str(localLogEntryList));
List<LogEntry> logEntryList = localLogEntryList.stream()
.map(LogEntry::toLogEntry)
.collect(Collectors.toList());
// 索引区间大小
long indexRange = (logIndexEnd - nextIndex + 1);
// 假设索引区间大小为N,可能有三种情况
// 1. 查出N条日志,所需要的日志全都在本地日志文件里没有被压缩
// 2. 查出1至N-1条日志,部分日志被压缩到快照里 or 就是只有那么多日志(一次批量查5条,但当前总共只写入了3条)
// 3. 查出0条日志,需要的日志全部被压缩了(因为是先落盘再广播,如果既没有日志也没有快照那就是有bug)
if(logEntryList.size() == indexRange+1){
// 查出了区间内的所有日志(case 1)
logger.info("find log size match!");
// preLog
LogEntry preLogEntry = logEntryList.get(0);
// 实际需要传输的log
List<LogEntry> needAppendLogList = logEntryList.subList(1,logEntryList.size());
appendEntriesRpcParam.setEntries(needAppendLogList);
appendEntriesRpcParam.setPrevLogIndex(preLogEntry.getLogIndex());
appendEntriesRpcParam.setPrevLogTerm(preLogEntry.getLogTerm());
}else if(logEntryList.size() > 0 && logEntryList.size() <= indexRange){
// 查出了部分日志(case 2)
// 新增日志压缩功能后,查出来的数据个数小于指定的区间不一定就是查到第一条数据,还有可能是日志被压缩了
logger.info("find log size not match!");
RaftSnapshot readSnapshotNoData = currentServer.getSnapshotModule().readSnapshotMetaData();
if(readSnapshotNoData != null){
logger.info("has snapshot! readSnapshotNoData={}",readSnapshotNoData);
// 存在快照,使用快照里保存的上一条日志信息
appendEntriesRpcParam.setPrevLogIndex(readSnapshotNoData.getLastIncludedIndex());
appendEntriesRpcParam.setPrevLogTerm(readSnapshotNoData.getLastIncludedTerm());
}else{
logger.info("no snapshot! prevLogIndex=-1, prevLogTerm=-1");
// 没有快照,说明恰好发送第一条日志记录(比如appendLogEntryBatchNum=5,但一共只有3条日志全查出来了)
// 第一条记录的prev的index和term都是-1
appendEntriesRpcParam.setPrevLogIndex(-1);
appendEntriesRpcParam.setPrevLogTerm(-1);
}
appendEntriesRpcParam.setEntries(logEntryList);
} else if(logEntryList.isEmpty()){
// 日志压缩把要同步的日志删除掉了,只能使用installSnapshotRpc了(case 3)
logger.info("can not find and log entry,maybe delete for log compress");
// 快照压缩导致leader更早的index日志已经不存在了
// 应该改为使用installSnapshot来同步进度
RaftSnapshot raftSnapshot = currentServer.getSnapshotModule().readSnapshot();
doInstallSnapshotRpc(node,raftSnapshot,currentServer);
// 走到这里,一般是成功的完成了快照的安装。目标follower目前已经有了包括lastIncludedIndex以及之前的所有日志
// 如果是因为成为follower快速返回,则再循环一次就结束了
nextIndex = raftSnapshot.getLastIncludedIndex() + 1;
continue;
} else{
// 走到这里不符合预期,日志模块有bug
throw new MyRaftException("replicationLogEntry logEntryList size error!" +
" nextIndex=" + nextIndex + " logEntryList.size=" + logEntryList.size());
}
logger.info("leader do appendEntries start, node={}, appendEntriesRpcParam={}",node,appendEntriesRpcParam);
AppendEntriesRpcResult appendEntriesRpcResult = node.appendEntries(appendEntriesRpcParam);
logger.info("leader do appendEntries end, node={}, appendEntriesRpcResult={}",node,appendEntriesRpcResult);
finallyResult = appendEntriesRpcResult;
// 收到更高任期的处理
boolean beFollower = currentServer.processCommunicationHigherTerm(appendEntriesRpcResult.getTerm());
if(beFollower){
return appendEntriesRpcResult;
}
if(appendEntriesRpcResult.isSuccess()){
logger.info("appendEntriesRpcResult is success, node={}",node);
// If successful: update nextIndex and matchIndex for follower (§5.3)
// 同步成功了,nextIndex递增一位
this.currentServer.getNextIndexMap().put(node,nextIndex+1);
this.currentServer.getMatchIndexMap().put(node,nextIndex);
nextIndex++;
}else{
// 因为日志对不上导致一致性检查没通过,同步没成功,nextIndex往后退一位
logger.info("appendEntriesRpcResult is false, node={}",node);
// If AppendEntries fails because of log inconsistency: decrement nextIndex and retry (§5.3)
nextIndex--;
this.currentServer.getNextIndexMap().put(node,nextIndex);
}
}
if(finallyResult == null){
// 说明有bug
throw new MyRaftException("replicationLogEntry finallyResult is null!");
}
logger.info("finallyResult={},node={}",node,finallyResult);
return finallyResult;
});
futureList.add(future);
}
// 获得结果
List<AppendEntriesRpcResult> appendEntriesRpcResultList = CommonUtil.concurrentGetRpcFutureResult(
"do appendEntries", futureList,
this.rpcThreadPool,2, TimeUnit.SECONDS);
logger.info("leader replicationLogEntry appendEntriesRpcResultList={}",appendEntriesRpcResultList);
return appendEntriesRpcResultList;
}
前面提到,follower侧在进行日志一致性校验时,也可能出现恰好前一条日志被压缩到快照里的情况。 因此需要在当前日志不存在时,尝试通过SnapshotModule读取快照数据中的前一条日志信息来进行比对.
public AppendEntriesRpcResult appendEntries(AppendEntriesRpcParam appendEntriesRpcParam) {
if(appendEntriesRpcParam.getTerm() < this.raftServerMetaDataPersistentModule.getCurrentTerm()){
// Reply false if term < currentTerm (§5.1)
// 拒绝处理任期低于自己的老leader的请求
logger.info("doAppendEntries term < currentTerm");
return new AppendEntriesRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm(),false);
}
if(appendEntriesRpcParam.getTerm() >= this.raftServerMetaDataPersistentModule.getCurrentTerm()){
// appendEntries请求中任期值如果大于自己,说明已经有一个更新的leader了,自己转为follower,并且以对方更大的任期为准
this.serverStatusEnum = ServerStatusEnum.FOLLOWER;
this.currentLeader = appendEntriesRpcParam.getLeaderId();
this.raftServerMetaDataPersistentModule.setCurrentTerm(appendEntriesRpcParam.getTerm());
}
if(appendEntriesRpcParam.getEntries() == null || appendEntriesRpcParam.getEntries().isEmpty()){
// 来自leader的心跳处理,清理掉之前选举的votedFor
this.cleanVotedFor();
// entries为空,说明是心跳请求,刷新一下最近收到心跳的时间
raftLeaderElectionModule.refreshLastHeartbeatTime();
long currentLastCommittedIndex = logModule.getLastCommittedIndex();
logger.debug("doAppendEntries heartbeat leaderCommit={},currentLastCommittedIndex={}",
appendEntriesRpcParam.getLeaderCommit(),currentLastCommittedIndex);
if(appendEntriesRpcParam.getLeaderCommit() > currentLastCommittedIndex) {
// 心跳处理里,如果leader当前已提交的日志进度超过了当前节点的进度,令当前节点状态机也跟上
// 如果leaderCommit >= logModule.getLastIndex(),说明当前节点的日志进度不足,但可以把目前已有的日志都提交给状态机去执行
// 如果leaderCommit < logModule.getLastIndex(),说明当前节点进度比较快,有一些日志是leader已复制但还没提交的,把leader已提交的那一部分作用到状态机就行
long minNeedCommittedIndex = Math.min(appendEntriesRpcParam.getLeaderCommit(), logModule.getLastIndex());
pushStatemachineApply(minNeedCommittedIndex);
}
// 心跳请求,直接返回
return new AppendEntriesRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm(),true);
}
// logEntries不为空,是真实的日志复制rpc
logger.info("do real log append! appendEntriesRpcParam={}",appendEntriesRpcParam);
// AppendEntry可靠性校验,如果prevLogIndex和prevLogTerm不匹配,则需要返回false,让leader发更早的日志过来
{
LogEntry localPrevLogEntry = logModule.readLocalLog(appendEntriesRpcParam.getPrevLogIndex());
if(localPrevLogEntry == null){
// 没有查到prevLogIndex对应的日志,分两种情况
RaftSnapshot raftSnapshot = snapshotModule.readSnapshotMetaData();
localPrevLogEntry = new LogEntry();
if(raftSnapshot == null){
// 当前节点日志条目为空,又没有快照,说明完全没有日志(默认任期为-1,这个是约定)
localPrevLogEntry.setLogIndex(-1);
localPrevLogEntry.setLogTerm(-1);
}else{
// 日志里没有,但是有快照(把快照里记录的最后一条日志信息与leader的参数比对)
localPrevLogEntry.setLogIndex(raftSnapshot.getLastIncludedIndex());
localPrevLogEntry.setLogTerm(raftSnapshot.getLastIncludedTerm());
}
}
if (localPrevLogEntry.getLogTerm() != appendEntriesRpcParam.getPrevLogTerm()) {
// Reply false if log doesn’t contain an entry at prevLogIndex
// whose term matches prevLogTerm (§5.3)
// 本地日志和参数中的PrevLogIndex和PrevLogTerm对不上(对应日志不存在,或者任期对不上)
logger.info("doAppendEntries localPrevLogEntry not match, localLogEntry={}",localPrevLogEntry);
return new AppendEntriesRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm(),false);
}
}
// 走到这里说明找到了最新的一条匹配的记录
logger.info("doAppendEntries localEntry is match");
List<LogEntry> newLogEntryList = appendEntriesRpcParam.getEntries();
// 1. Append any new entries not already in the log
// 2. If an existing entry conflicts with a new one (same index but different terms),
// delete the existing entry and all that follow it (§5.3)
// 新日志的复制操作(直接整个覆盖掉prevLogIndex之后的所有日志,以leader发过来的日志为准)
logModule.writeLocalLog(newLogEntryList, appendEntriesRpcParam.getPrevLogIndex());
// If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
if(appendEntriesRpcParam.getLeaderCommit() > logModule.getLastCommittedIndex()){
// 如果leaderCommit更大,说明当前节点的同步进度慢于leader,以新的entry里的index为准(更高的index还没有在本地保存(因为上面的appendEntry有效性检查))
// 如果index of last new entry更大,说明当前节点的同步进度是和leader相匹配的,commitIndex以leader最新提交的为准
LogEntry lastNewEntry = newLogEntryList.get(newLogEntryList.size()-1);
long lastCommittedIndex = Math.min(appendEntriesRpcParam.getLeaderCommit(), lastNewEntry.getLogIndex());
pushStatemachineApply(lastCommittedIndex);
}
// 返回成功
return new AppendEntriesRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm(), true);
}
public static void doInstallSnapshotRpc(RaftService targetNode, RaftSnapshot raftSnapshot, RaftServer currentServer){
int installSnapshotBlockSize = currentServer.getRaftConfig().getInstallSnapshotBlockSize();
byte[] completeSnapshotData = raftSnapshot.getSnapshotData();
int currentOffset = 0;
while(true){
InstallSnapshotRpcParam installSnapshotRpcParam = new InstallSnapshotRpcParam();
installSnapshotRpcParam.setLastIncludedIndex(raftSnapshot.getLastIncludedIndex());
installSnapshotRpcParam.setTerm(currentServer.getCurrentTerm());
installSnapshotRpcParam.setLeaderId(currentServer.getServerId());
installSnapshotRpcParam.setLastIncludedTerm(raftSnapshot.getLastIncludedTerm());
installSnapshotRpcParam.setOffset(currentOffset);
// 填充每次传输的数据块
int blockSize = Math.min(installSnapshotBlockSize,completeSnapshotData.length-currentOffset);
byte[] block = new byte[blockSize];
System.arraycopy(completeSnapshotData,currentOffset,block,0,blockSize);
installSnapshotRpcParam.setData(block);
currentOffset += installSnapshotBlockSize;
if(currentOffset >= completeSnapshotData.length){
installSnapshotRpcParam.setDone(true);
}else{
installSnapshotRpcParam.setDone(false);
}
InstallSnapshotRpcResult installSnapshotRpcResult = targetNode.installSnapshot(installSnapshotRpcParam);
boolean beFollower = currentServer.processCommunicationHigherTerm(installSnapshotRpcResult.getTerm());
if(beFollower){
// 传输过程中发现自己已经不再是leader了,快速结束
logger.info("doInstallSnapshotRpc beFollower quick return!");
return;
}
if(installSnapshotRpcParam.isDone()){
// 快照整体安装完毕
logger.info("doInstallSnapshotRpc isDone!");
return;
}
}
}
public InstallSnapshotRpcResult installSnapshot(InstallSnapshotRpcParam installSnapshotRpcParam) {
logger.info("installSnapshot start! serverId={},installSnapshotRpcParam={}",this.serverId,installSnapshotRpcParam);
if(installSnapshotRpcParam.getTerm() < this.raftServerMetaDataPersistentModule.getCurrentTerm()){
// Reply immediately if term < currentTerm
// 拒绝处理任期低于自己的老leader的请求
logger.info("installSnapshot term < currentTerm");
return new InstallSnapshotRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm());
}
// 安装快照
this.snapshotModule.appendInstallSnapshot(installSnapshotRpcParam);
// 快照已经完全安装好了
if(installSnapshotRpcParam.isDone()){
// discard any existing or partial snapshot with a smaller index
// 快照整体安装完毕,清理掉index小于等于快照中lastIncludedIndex的所有日志(日志压缩)
logModule.compressLogBySnapshot(installSnapshotRpcParam);
// Reset state machine using snapshot contents (and load snapshot’s cluster configuration)
// follower的状态机重新安装快照
RaftSnapshot raftSnapshot = this.snapshotModule.readSnapshot();
kvReplicationStateMachine.installSnapshot(raftSnapshot.getSnapshotData());
}
logger.info("installSnapshot end! serverId={}",this.serverId);
return new InstallSnapshotRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm());
}
/**
* discard any existing or partial snapshot with a smaller index
* 快照整体安装完毕,清理掉index小于等于快照中lastIncludedIndex的所有日志
* */
public void compressLogBySnapshot(InstallSnapshotRpcParam installSnapshotRpcParam){
this.lastCommittedIndex = installSnapshotRpcParam.getLastIncludedIndex();
if(this.lastIndex < this.lastCommittedIndex){
this.lastIndex = this.lastCommittedIndex;
}
try {
buildNewLogFileRemoveCommittedLog();
} catch (IOException e) {
throw new MyRaftException("compressLogBySnapshot error",e);
}
}
和lab2中一样,通过启动一个raft集群并触发几个case可以验证MyRaft日志压缩功能的正确性.
最后此篇关于手写raft(三)实现日志压缩的文章就讲到这里了,如果你想了解更多关于手写raft(三)实现日志压缩的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
这是真的: log(A) + log(B) = log(A * B) [0] 这也是真的吗? O(log(A)) + O(log(B)) = O(log(A * B)) [1] 据我了解 O(f
日志 日志是构建工具的主要界面。如果日志太多,真正的警告和问题容易被隐藏。另一方面,如果出了错,你需要找出相关的信息。Gradle 定义了6个日志级别,如表 18.1,“日志级别”所示。除了那些您通
日志 关键进程日志如下…(将 替换为启动服务的用户,将 替换为计算机名称) NameNode: $ HADOOP_HOME / logs / hadoop- -namenode- .log Da
我正在探索项目的 git 历史 FFMpeg .我在提交之间对每个文件执行了更改 517573a67088b5c7a25c18373434e3448892ee93和 80bb65fafab1d2f5f
我不知道如何在 loggly 中使用正则表达式进行搜索。例如,使用表达式 /24nonstop.+7554/ 记录我想查找的内容. { "level_name": "WARNING", "ex
有没有办法为 API 调用打开日志记录? 我们有一个第三方应用程序在使用我们的商店时遇到问题,希望获得一些调试信息。 ~我已经搜索了 bt 一无所获。 我正在使用 1.7 最佳答案 在一段受控的时间内
我正在尝试获取 SVN 中所有副本/移动/等的固定路径的日志历史记录(如果可能的话,递归地)。实际上,我试图避免 peg revisions ,并将日志应用于路径而不是对象。 svn 手册提出了这个问
如何在命令行中运行 NAnt 脚本并在日志文件中获取每个任务的时间? using nant task or NAnt -buildfile:testscript.build testnanttarg
是否有任何默认方式来记录哪些用户代理访问了您的服务器?我需要编制一份访问我们网站的浏览器列表,以便我们知道我们最能支持什么。 谢谢! 最佳答案 日志CGI.HTTP_USER_AGENT ,也许在 A
我在我的应用程序中使用 Spring 发送电子邮件。 我想在发送电子邮件时记录 imap 服务器操作。 我尝试按如下方式在我的 applicationContext.xml 中实现日志:
我已经运行一个 pod 一个多星期了,从开始到现在没有重启过。但是,我仍然无法查看自它启动以来的日志,它只提供最近两天的日志。容器是否有任何日志轮换策略以及如何根据大小或日期控制轮换? 我尝试了以下命
背景: 我正在设置我的第一个 flex 堆栈,尽管我将开始简单,但是我想确保我从良好的体系结构开始。我最终希望有以下解决方案:托管指标,服务器日志(expressjs APM),单页应用程序监视(AP
常规的 hg log 命令给出每个变更集至少 4 行的输出。例如 changeset: 238:03a214f2a1cf user: My Name date: Th
我在我的项目中使用 Spring iBatis 框架。然后使用 logback 进行记录。然后,在检查日志文件时,我可以看到系统正在使用的数据库...出于安全目的我想隐藏它 这是示例日志.. 12:2
我想使用 hg log 生成一个简短的变更日志,涵盖最新版本的变更。发行版标有“v”前缀,例如“v0.9.1”或“v1.0”。是否可以使用 revsets 选择以“v”开头的最后两个标签之间的范围,不
我是 PHP 的新手,所以如果有一个简单的答案,请原谅我。我在 stackoverflow 中搜索过任何类似的问题,但找不到任何帮助。 我正在开发一个现有的基于 php 的应用程序,我只需要能够将对象
我有一个名为 Radius 的程序可以验证用户登录。运行在CentOS服务器上 日志在/var/log/radius.log 中 它们如下 Mon Jul 24 22:17:08 2017 : Aut
我最近从使用“日志”切换到“日志”。 到目前为止,还不错,但我缺少一项关键功能——在运行时更改最低级别的能力。 在“logging',我可以调用 myLogger.setLevel(logging.I
假设我们有速度关键的系统(例如统计/分析、套接字编程等),我们如何设计跟踪和日志。 更具体地说,日志和跟踪通常会降低性能(即使我们有关闭机制或冗长的扩展机制)。在这种情况下,是否有任何关于如何“放置”
有人知道这个 don't panic 日志包含什么类型的信息吗? /data/dontpanic 然后,我该如何分析这个日志? 最佳答案 我发现这个文件夹在内核崩溃发生后包含了一些 apanic 文件
我是一名优秀的程序员,十分优秀!