- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
raft的论文 中将raft算法的功能分解为4个模块:
其中前两项“leader选举”和“日志复制”是raft算法的基础,而后两项“日志压缩”和“集群成员动态变更”属于raft算法在功能上的重要优化.
raft论文中英翻译 。
通过raft的论文或者其它相关资料,读者基本能大致理解raft的工作原理。 但纸上得来终觉浅,绝知此事要躬行,亲手实践才能更好的把握raft中的精巧细节,加深对raft算法的理解,更有效的阅读基于raft或其它一致性协议的开源项目源码.
在这个系列博客中会带领读者一步步实现一个基于raft算法的简易KV数据库,即MyRaft。MyRaft的实现基于原始的raft算法,没有额外的优化,目的是为了保证实现的简单性。 MyRaft实现了raft论文中提到的三个功能,即”leader选举“、”日志复制“和”日志压缩“(在实践中发现“集群成员动态变更”对原有逻辑有较大改动而大幅增加了复杂度,限于个人水平暂不实现)。 三个功能会通过三次迭代实验逐步完成,其中每个迭代都会以博客的形式分享出来.
public interface RaftService {
/**
* 请求投票 requestVote
*
* Receiver implementation:
* 1. Reply false if term < currentTerm (§5.1)
* 2. If votedFor is null or candidateId, and candidate’s log is at
* least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
*
* 接受者需要实现以下功能:
* 1. 如果参数中的任期值term小于当前自己的任期值currentTerm,则返回false不同意投票给调用者
* 2. 如果自己还没有投票(FIFO)或者已经投票给了candidateId对应的节点(幂等),
* 并且候选人的日志至少与被调用者的日志一样新(比较日志的任期值和索引值),则投票给调用者(返回值里voteGranted为true)
* */
RequestVoteRpcResult requestVote(RequestVoteRpcParam requestVoteRpcParam);
/**
* 追加日志条目 AppendEntries
* */
AppendEntriesRpcResult appendEntries(AppendEntriesRpcParam appendEntriesRpcParam);
}
/**
* 请求投票的RPC接口参数对象
*/
public class RequestVoteRpcParam implements Serializable {
/**
* 候选人的任期编号
* */
private int term;
/**
* 候选人的Id
* */
private String candidateId;
/**
* 候选人最新日志的索引编号
* */
private long lastLogIndex;
/**
* 候选人最新日志对应的任期编号
* */
private int lastLogTerm;
}
/**
* 请求投票的RPC接口响应对象
* */
public class RequestVoteRpcResult implements Serializable {
/**
* 被调用者当前的任期值
* */
private int term;
/**
* 是否同意投票给调用者
* */
private boolean voteGranted;
}
/**
* 追加日志条目的RPC接口参数对象
* */
public class AppendEntriesRpcParam implements Serializable {
/**
* 当前leader的任期值
* */
private int term;
/**
* leader的id
* */
private String leaderId;
}
/**
* 追加日志条目的RPC接口响应对象
* */
public class AppendEntriesRpcResult implements Serializable {
/**
* 被调用者当前的任期值
* */
private int term;
/**
* 是否处理成功
* */
private boolean success;
}
/**
* raft的rpc服务
* */
public class RaftRpcServer extends RaftServer {
private final Registry registry;
private final RaftNodeConfig currentNodeConfig;
public RaftRpcServer(RaftConfig raftConfig, Registry registry){
super(raftConfig);
this.currentNodeConfig = raftConfig.getCurrentNodeConfig();
this.registry = registry;
}
@Override
public void init(List<RaftService> otherNodeInCluster) {
// 先初始化内部模块
super.init(otherNodeInCluster);
// 初始化内部的模块后,启动rpc
initRpcServer();
}
public List<RaftService> getRpcProxyList(List<RaftNodeConfig> otherNodeInCluster){
return initRpcConsumer(otherNodeInCluster);
}
private List<RaftService> initRpcConsumer(List<RaftNodeConfig> otherNodeInCluster){
ConsumerBootstrap consumerBootstrap = new ConsumerBootstrap()
.registry(registry)
.loadBalance(new SimpleRoundRobinBalance());
// 注册消费者
Consumer<RaftService> consumer = consumerBootstrap.registerConsumer(RaftService.class,new FastFailInvoker());
RaftService raftServiceProxy = consumer.getProxy();
List<RaftService> raftRpcConsumerList = new ArrayList<>();
for(RaftNodeConfig raftNodeConfig : otherNodeInCluster){
// 使用rpc代理的客户端
raftRpcConsumerList.add(new RaftRpcConsumer(raftNodeConfig,raftServiceProxy));
}
return raftRpcConsumerList;
}
private void initRpcServer(){
URLAddress providerURLAddress = new URLAddress(currentNodeConfig.getIp(),currentNodeConfig.getPort());
Provider<RaftService> provider = new Provider<>();
provider.setInterfaceClass(RaftService.class);
provider.setRef(this);
provider.setUrlAddress(providerURLAddress);
provider.setRegistry(registry);
provider.export();
NettyServer nettyServer = new NettyServer(providerURLAddress);
nettyServer.init();
}
}
public class RaftRpcConsumer implements RaftService {
private static final Logger logger = LoggerFactory.getLogger(RaftRpcConsumer.class);
private final RaftNodeConfig targetNodeConfig;
private final RaftService raftServiceProxy;
public RaftRpcConsumer(RaftNodeConfig targetNodeConfig, RaftService proxyRaftService) {
this.targetNodeConfig = targetNodeConfig;
this.raftServiceProxy = proxyRaftService;
}
@Override
public RequestVoteRpcResult requestVote(RequestVoteRpcParam requestVoteRpcParam) {
// 强制指定rpc目标的ip/port
setTargetProviderUrl();
RequestVoteRpcResult result = raftServiceProxy.requestVote(requestVoteRpcParam);
return result;
}
@Override
public AppendEntriesRpcResult appendEntries(AppendEntriesRpcParam appendEntriesRpcParam) {
// 强制指定rpc目标的ip/port
setTargetProviderUrl();
AppendEntriesRpcResult result = raftServiceProxy.appendEntries(appendEntriesRpcParam);
return result;
}
private void setTargetProviderUrl(){
ConsumerRpcContext consumerRpcContext = ConsumerRpcContextHolder.getConsumerRpcContext();
consumerRpcContext.setTargetProviderAddress(
new URLAddress(targetNodeConfig.getIp(),targetNodeConfig.getPort()));
}
}
public class RaftServerMetaData {
/**
* 当前服务器的任期值
* */
private int currentTerm;
/**
* 当前服务器在此之前投票给了谁?
* (候选者的serverId,如果还没有投递就是null)
* */
private String votedFor;
}
public class RaftServerMetaDataPersistentModule {
/**
* 当前服务器的任期值
* */
private volatile int currentTerm;
/**
* 当前服务器在此之前投票给了谁?
* (候选者的serverId,如果还没有投递就是null)
* */
private volatile String votedFor;
private final File persistenceFile;
private final ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.WriteLock writeLock = reentrantLock.writeLock();
private final ReentrantReadWriteLock.ReadLock readLock = reentrantLock.readLock();
public RaftServerMetaDataPersistentModule(String serverId) {
String userPath = System.getProperty("user.dir") + File.separator + serverId;
this.persistenceFile = new File(userPath + File.separator + "raftServerMetaData-" + serverId + ".txt");
MyRaftFileUtil.createFile(persistenceFile);
// 读取持久化在磁盘中的数据
RaftServerMetaData raftServerMetaData = readRaftServerMetaData(persistenceFile);
this.currentTerm = raftServerMetaData.getCurrentTerm();
this.votedFor = raftServerMetaData.getVotedFor();
}
public int getCurrentTerm() {
readLock.lock();
try {
return currentTerm;
}finally {
readLock.unlock();
}
}
public void setCurrentTerm(int currentTerm) {
writeLock.lock();
try {
this.currentTerm = currentTerm;
// 更新后数据落盘
persistentRaftServerMetaData(new RaftServerMetaData(this.currentTerm,this.votedFor),persistenceFile);
}finally {
writeLock.unlock();
}
}
public String getVotedFor() {
readLock.lock();
try {
return votedFor;
}finally {
readLock.unlock();
}
}
public void setVotedFor(String votedFor) {
writeLock.lock();
try {
if(Objects.equals(this.votedFor,votedFor)){
// 相等的话就不刷新了
return;
}
this.votedFor = votedFor;
// 更新后数据落盘
persistentRaftServerMetaData(new RaftServerMetaData(this.currentTerm,this.votedFor),persistenceFile);
}finally {
writeLock.unlock();
}
}
private static RaftServerMetaData readRaftServerMetaData(File persistenceFile){
String content = MyRaftFileUtil.getFileContent(persistenceFile);
if(StringUtils.hasText(content)){
return JsonUtil.json2Obj(content,RaftServerMetaData.class);
}else{
return RaftServerMetaData.getDefault();
}
}
private static void persistentRaftServerMetaData(RaftServerMetaData raftServerMetaData, File persistenceFile){
String content = JsonUtil.obj2Str(raftServerMetaData);
MyRaftFileUtil.writeInFile(persistenceFile,content);
}
}
raft的leader选举在论文中有较详细的描述,这里说一下我认为的关键细节.
下面基于源码展开介绍MyRaft是如何实现raft领导者选举的.
大致分为以下几部分:
public class RaftConfig {
/**
* 当前服务节点的id(集群内全局唯一)
* */
private final String serverId;
/**
* 自己节点的配置
* */
private final RaftNodeConfig currentNodeConfig;
/**
* 整个集群所有的服务节点的id集合
* */
private final List<RaftNodeConfig> raftNodeConfigList;
/**
* 集群中多数的值(例如:5节点majorityNum=3,6节点majorityNum=4)
* */
private final int majorityNum;
/**
* 选举超时时间 单位:秒
* */
private int electionTimeout;
/**
* 选举超时时间的随机化区间 单位:毫秒
* */
private Range<Integer> electionTimeoutRandomRange;
/**
* 心跳间隔时间 单位:秒
* */
private int HeartbeatInternal;
}
public class RaftNodeConfig {
private String serverId;
private String ip;
private int port;
}
/**
* Raft服务器的leader选举模块
* */
public class RaftLeaderElectionModule {
private static final Logger logger = LoggerFactory.getLogger(RaftLeaderElectionModule.class);
private final RaftServer currentServer;
/**
* 最近一次接受到心跳的时间
* */
private volatile Date lastHeartbeatTime;
private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorService rpcThreadPool;
public RaftLeaderElectionModule(RaftServer currentServer) {
this.currentServer = currentServer;
this.lastHeartbeatTime = new Date();
this.scheduledExecutorService = Executors.newScheduledThreadPool(3);
this.rpcThreadPool = Executors.newFixedThreadPool(
Math.max(currentServer.getOtherNodeInCluster().size() * 2, 1));
registerHeartbeatTimeoutCheckTaskWithRandomTimeout();
}
/**
* 提交新的延迟任务(带有随机化的超时时间)
* */
public void registerHeartbeatTimeoutCheckTaskWithRandomTimeout(){
int electionTimeout = currentServer.getRaftConfig().getElectionTimeout();
if(currentServer.getCurrentTerm() > 0 && currentServer.getRaftConfig().getDebugElectionTimeout() != null){
// debug的时候多等待一些时间
electionTimeout = currentServer.getRaftConfig().getDebugElectionTimeout();
}
long randomElectionTimeout = getRandomElectionTimeout();
// 选举超时时间的基础上,加上一个随机化的时间
long delayTime = randomElectionTimeout + electionTimeout * 1000L;
logger.debug("registerHeartbeatTimeoutCheckTaskWithRandomTimeout delayTime={}",delayTime);
scheduledExecutorService.schedule(
new HeartbeatTimeoutCheckTask(currentServer,this),delayTime,TimeUnit.MILLISECONDS);
}
/**
* 处理投票请求
* 注意:synchronized修饰防止不同candidate并发的投票申请处理,以FIFO的方式处理
* */
public synchronized RequestVoteRpcResult requestVoteProcess(RequestVoteRpcParam requestVoteRpcParam){
if(this.currentServer.getCurrentTerm() > requestVoteRpcParam.getTerm()){
// Reply false if term < currentTerm (§5.1)
// 发起投票的candidate任期小于当前服务器任期,拒绝投票给它
logger.info("reject requestVoteProcess! term < currentTerm, currentServerId={}",currentServer.getServerId());
return new RequestVoteRpcResult(this.currentServer.getCurrentTerm(),false);
}
// 发起投票的节点任期高于当前节点,无条件投票给它(任期高的说了算)
if(this.currentServer.getCurrentTerm() < requestVoteRpcParam.getTerm()){
// 刷新元数据
this.currentServer.refreshRaftServerMetaData(
new RaftServerMetaData(requestVoteRpcParam.getTerm(),requestVoteRpcParam.getCandidateId()));
// 任期没它高,自己转为follower
this.currentServer.setServerStatusEnum(ServerStatusEnum.FOLLOWER);
return new RequestVoteRpcResult(this.currentServer.getCurrentTerm(),true);
}
// term任期值相同,需要避免同一任期内投票给不同的节点而脑裂
if(this.currentServer.getVotedFor() != null && !this.currentServer.getVotedFor().equals(requestVoteRpcParam.getCandidateId())){
// If votedFor is null or candidateId(取反的卫语句)
// 当前服务器已经把票投给了别人,拒绝投票给发起投票的candidate
logger.info("reject requestVoteProcess! votedFor={},currentServerId={}",
currentServer.getVotedFor(),currentServer.getServerId());
return new RequestVoteRpcResult(this.currentServer.getCurrentTerm(),false);
}
// 投票校验通过,刷新元数据
this.currentServer.refreshRaftServerMetaData(
new RaftServerMetaData(requestVoteRpcParam.getTerm(),requestVoteRpcParam.getCandidateId()));
this.currentServer.processCommunicationHigherTerm(requestVoteRpcParam.getTerm());
return new RequestVoteRpcResult(this.currentServer.getCurrentTerm(),true);
}
public void refreshLastHeartbeatTime(){
// 刷新最新的接受到心跳的时间
this.lastHeartbeatTime = new Date();
// 接受新的心跳,说明现在leader是存活的,清理掉之前的投票信息
this.currentServer.cleanVotedFor();
}
private long getRandomElectionTimeout(){
long min = currentServer.getRaftConfig().getElectionTimeoutRandomRange().getLeft();
long max = currentServer.getRaftConfig().getElectionTimeoutRandomRange().getRight();
// 生成[min,max]范围内随机整数的通用公式为:n=rand.nextInt(max-min+1)+min。
return ThreadLocalRandom.current().nextLong(max-min+1) + min;
}
}
/**
* 心跳超时检查任务
* */
public class HeartbeatTimeoutCheckTask implements Runnable{
private static final Logger logger = LoggerFactory.getLogger(HeartbeatTimeoutCheckTask.class);
private final RaftServer currentServer;
private final RaftLeaderElectionModule raftLeaderElectionModule;
public HeartbeatTimeoutCheckTask(RaftServer currentServer, RaftLeaderElectionModule raftLeaderElectionModule) {
this.currentServer = currentServer;
this.raftLeaderElectionModule = raftLeaderElectionModule;
}
@Override
public void run() {
if(currentServer.getServerStatusEnum() == ServerStatusEnum.LEADER){
// leader是不需要处理心跳超时的
// 注册下一个心跳检查任务
raftLeaderElectionModule.registerHeartbeatTimeoutCheckTaskWithRandomTimeout();
}else{
try {
doTask();
}catch (Exception e){
logger.info("do HeartbeatTimeoutCheckTask error! ignore",e);
}
// 注册下一个心跳检查任务
raftLeaderElectionModule.registerHeartbeatTimeoutCheckTaskWithRandomTimeout();
}
}
private void doTask(){
logger.debug("do HeartbeatTimeoutCheck start {}",currentServer.getServerId());
int electionTimeout = currentServer.getRaftConfig().getElectionTimeout();
// 当前时间
Date currentDate = new Date();
Date lastHeartbeatTime = raftLeaderElectionModule.getLastHeartbeatTime();
long diffTime = currentDate.getTime() - lastHeartbeatTime.getTime();
logger.debug("currentDate={}, lastHeartbeatTime={}, diffTime={}, serverId={}",
currentDate,lastHeartbeatTime,diffTime,currentServer.getServerId());
// 心跳超时判断
if(diffTime > (electionTimeout * 1000L)){
logger.info("HeartbeatTimeoutCheck check fail, trigger new election! serverId={}",currentServer.getServerId());
// 触发新的一轮选举
triggerNewElection();
}else{
// 认定为心跳正常,无事发生
logger.debug("HeartbeatTimeoutCheck check success {}",currentServer.getServerId());
}
logger.debug("do HeartbeatTimeoutCheck end {}",currentServer.getServerId());
}
private void triggerNewElection(){
logger.info("HeartbeatTimeoutCheck check fail, trigger new election! serverId={}",currentServer.getServerId());
// 距离最近一次接到心跳已经超过了选举超时时间,触发新一轮选举
// 当前服务器节点当前任期自增1
currentServer.setCurrentTerm(currentServer.getCurrentTerm()+1);
// 自己发起选举,先投票给自己
currentServer.setVotedFor(currentServer.getServerId());
// 角色转变为CANDIDATE候选者
currentServer.setServerStatusEnum(ServerStatusEnum.CANDIDATE);
// 并行的发送请求投票的rpc给集群中的其它节点
List<RaftService> otherNodeInCluster = currentServer.getOtherNodeInCluster();
List<Future<RequestVoteRpcResult>> futureList = new ArrayList<>(otherNodeInCluster.size());
// 构造请求参数
RequestVoteRpcParam requestVoteRpcParam = new RequestVoteRpcParam();
requestVoteRpcParam.setTerm(currentServer.getCurrentTerm());
requestVoteRpcParam.setCandidateId(currentServer.getServerId());
for(RaftService node : otherNodeInCluster){
Future<RequestVoteRpcResult> future = raftLeaderElectionModule.getRpcThreadPool().submit(
()-> {
RequestVoteRpcResult rpcResult = node.requestVote(requestVoteRpcParam);
// 收到更高任期的处理
currentServer.processCommunicationHigherTerm(rpcResult.getTerm());
return rpcResult;
}
);
futureList.add(future);
}
List<RequestVoteRpcResult> requestVoteRpcResultList = CommonUtil.concurrentGetRpcFutureResult(
"requestVote", futureList,
raftLeaderElectionModule.getRpcThreadPool(),1,TimeUnit.SECONDS);
// 获得rpc响应中决定投票给自己的总票数(算上自己的1票)
int getRpcVoted = (int) requestVoteRpcResultList.stream().filter(RequestVoteRpcResult::isVoteGranted).count()+1;
logger.info("HeartbeatTimeoutCheck election, getRpcVoted={}, currentServerId={}",getRpcVoted,currentServer.getServerId());
// 是否获得大多数的投票
boolean majorVoted = getRpcVoted >= this.currentServer.getRaftConfig().getMajorityNum();
if(majorVoted){
logger.info("HeartbeatTimeoutCheck election result: become a leader! {}, currentTerm={}",currentServer.getServerId(),currentServer.getCurrentTerm());
// 票数过半成功当选为leader
currentServer.setServerStatusEnum(ServerStatusEnum.LEADER);
currentServer.setCurrentLeader(currentServer.getServerId());
// 成为leader后立马发送一次心跳,抑制其它节点发起新的一轮选举
// Upon election: send initial empty AppendEntries RPCs (heartbeat) to each server;
// repeat during idle periods to prevent election timeouts (§5.2)
HeartbeatBroadcastTask.doHeartbeatBroadcast(currentServer);
}else{
// 票数不过半,无法成为leader
logger.info("HeartbeatTimeoutCheck election result: not become a leader! {}",currentServer.getServerId());
}
this.currentServer.cleanVotedFor();
}
}
public class CommonUtil {
private static final Logger logger = LoggerFactory.getLogger(CommonUtil.class);
/**
* 并发的获得future列表的结果
* */
public static <T> List<T> concurrentGetRpcFutureResult(
String info, List<Future<T>> futureList, ExecutorService threadPool, long timeout, TimeUnit timeUnit){
CountDownLatch countDownLatch = new CountDownLatch(futureList.size());
List<T> resultList = new ArrayList<>(futureList.size());
for(Future<T> futureItem : futureList){
threadPool.execute(()->{
try {
logger.debug(info + " concurrentGetRpcFutureResult start!");
T result = futureItem.get(timeout,timeUnit);
logger.debug(info + " concurrentGetRpcFutureResult end!");
resultList.add(result);
} catch (Exception e) {
// rpc异常不考虑
logger.error( "{} getFutureResult error!",info,e);
} finally {
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new MyRaftException("getFutureResult error!",e);
}
return resultList;
}
}
/**
* Raft服务器的心跳广播模块
* */
public class RaftHeartbeatBroadcastModule {
private final RaftServer currentServer;
private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorService rpcThreadPool;
public RaftHeartbeatBroadcastModule(RaftServer currentServer) {
this.currentServer = currentServer;
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
this.rpcThreadPool = Executors.newFixedThreadPool(
Math.max(currentServer.getOtherNodeInCluster().size() * 2, 1));
int HeartbeatInternal = currentServer.getRaftConfig().getHeartbeatInternal();
// 心跳广播任务需要以固定频率执行(scheduleAtFixedRate)
scheduledExecutorService.scheduleAtFixedRate(
new HeartbeatBroadcastTask(currentServer,this), 0, HeartbeatInternal, TimeUnit.SECONDS);
}
}
/**
* leader心跳广播任务
* */
public class HeartbeatBroadcastTask implements Runnable{
private static final Logger logger = LoggerFactory.getLogger(HeartbeatBroadcastTask.class);
private final RaftServer currentServer;
private final RaftHeartbeatBroadcastModule raftHeartbeatBroadcastModule;
private int HeartbeatCount = 0;
public HeartbeatBroadcastTask(RaftServer currentServer, RaftHeartbeatBroadcastModule raftHeartbeatBroadcastModule) {
this.currentServer = currentServer;
this.raftHeartbeatBroadcastModule = raftHeartbeatBroadcastModule;
}
@Override
public void run() {
if(currentServer.getServerStatusEnum() != ServerStatusEnum.LEADER){
// 只有leader才需要广播心跳
return;
}
// 心跳广播
doHeartbeatBroadcast(currentServer);
}
/**
* 做心跳广播
* @return 是否大多数节点依然认为自己是leader
* */
public static boolean doHeartbeatBroadcast(RaftServer currentServer){
logger.info("do HeartbeatBroadcast start {}",currentServer.getServerId());
// 先刷新自己的心跳时间
currentServer.getRaftLeaderElectionModule().refreshLastHeartbeatTime();
// 并行的发送心跳rpc给集群中的其它节点
List<RaftService> otherNodeInCluster = currentServer.getOtherNodeInCluster();
List<Future<AppendEntriesRpcResult>> futureList = new ArrayList<>(otherNodeInCluster.size());
// 构造请求参数(心跳rpc,entries为空)
AppendEntriesRpcParam appendEntriesRpcParam = new AppendEntriesRpcParam();
appendEntriesRpcParam.setTerm(currentServer.getCurrentTerm());
appendEntriesRpcParam.setLeaderId(currentServer.getServerId());
for(RaftService node : otherNodeInCluster){
Future<AppendEntriesRpcResult> future = currentServer.getRaftHeartbeatBroadcastModule().getRpcThreadPool().submit(
()-> {
AppendEntriesRpcResult rpcResult = node.appendEntries(appendEntriesRpcParam);
// rpc交互时任期高于当前节点任期的处理
currentServer.processCommunicationHigherTerm(rpcResult.getTerm());
return rpcResult;
}
);
futureList.add(future);
}
List<AppendEntriesRpcResult> appendEntriesRpcResultList = CommonUtil.concurrentGetRpcFutureResult("doHeartbeatBroadcast",futureList,
currentServer.getRaftHeartbeatBroadcastModule().getRpcThreadPool(),1, TimeUnit.SECONDS);
// 通知成功的数量(+1包括自己)
int successResponseCount = (int) (appendEntriesRpcResultList.stream().filter(AppendEntriesRpcResult::isSuccess).count() + 1);
if(successResponseCount >= currentServer.getRaftConfig().getMajorityNum()
&& currentServer.getServerStatusEnum() == ServerStatusEnum.LEADER){
// 大多数节点依然认为自己是leader,并且广播的节点中没有人任期高于当前节点,让当前节点主动让位
return true;
}else{
// 大多数节点不认为自己是leader(包括广播超时等未接到响应的场景,也认为是广播失败)
return false;
}
}
}
处理requestVote请求 。
处理心跳请求 。
public class RaftServer implements RaftService {
private static final Logger logger = LoggerFactory.getLogger(RaftServer.class);
/**
* 当前服务节点的id(集群内全局唯一)
* */
private final String serverId;
/**
* Raft服务端配置
* */
private final RaftConfig raftConfig;
/**
* 当前服务器的状态
* */
private volatile ServerStatusEnum serverStatusEnum;
/**
* raft服务器元数据(当前任期值currentTerm、当前投票给了谁votedFor)
* */
private final RaftServerMetaDataPersistentModule raftServerMetaDataPersistentModule;
/**
* 当前服务认为的leader节点的Id
* */
private volatile String currentLeader;
/**
* 集群中的其它raft节点服务
* */
protected List<RaftService> otherNodeInCluster;
private RaftLeaderElectionModule raftLeaderElectionModule;
private RaftHeartbeatBroadcastModule raftHeartbeatBroadcastModule;
public RaftServer(RaftConfig raftConfig) {
this.serverId = raftConfig.getServerId();
this.raftConfig = raftConfig;
// 初始化时都是follower
this.serverStatusEnum = ServerStatusEnum.FOLLOWER;
// 服务器元数据模块
this.raftServerMetaDataPersistentModule = new RaftServerMetaDataPersistentModule(raftConfig.getServerId());
}
public void init(List<RaftService> otherNodeInCluster){
// 集群中的其它节点服务
this.otherNodeInCluster = otherNodeInCluster;
raftLeaderElectionModule = new RaftLeaderElectionModule(this);
raftHeartbeatBroadcastModule = new RaftHeartbeatBroadcastModule(this);
logger.info("raft server init end! otherNodeInCluster={}, currentServerId={}",otherNodeInCluster,serverId);
}
@Override
public RequestVoteRpcResult requestVote(RequestVoteRpcParam requestVoteRpcParam) {
RequestVoteRpcResult requestVoteRpcResult = raftLeaderElectionModule.requestVoteProcess(requestVoteRpcParam);
processCommunicationHigherTerm(requestVoteRpcParam.getTerm());
logger.info("do requestVote requestVoteRpcParam={},requestVoteRpcResult={}, currentServerId={}",
requestVoteRpcParam,requestVoteRpcResult,this.serverId);
return requestVoteRpcResult;
}
@Override
public AppendEntriesRpcResult appendEntries(AppendEntriesRpcParam appendEntriesRpcParam) {
if(appendEntriesRpcParam.getTerm() < this.raftServerMetaDataPersistentModule.getCurrentTerm()){
// Reply false if term < currentTerm (§5.1)
// 拒绝处理任期低于自己的老leader的请求
logger.info("doAppendEntries term < currentTerm");
return new AppendEntriesRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm(),false);
}
if(appendEntriesRpcParam.getTerm() >= this.raftServerMetaDataPersistentModule.getCurrentTerm()){
// appendEntries请求中任期值如果大于自己,说明已经有一个更新的leader了,自己转为follower,并且以对方更大的任期为准
this.serverStatusEnum = ServerStatusEnum.FOLLOWER;
this.currentLeader = appendEntriesRpcParam.getLeaderId();
this.raftServerMetaDataPersistentModule.setCurrentTerm(appendEntriesRpcParam.getTerm());
}
// 来自leader的心跳处理,清理掉之前选举的votedFor
this.cleanVotedFor();
// entries为空,说明是心跳请求,刷新一下最近收到心跳的时间
raftLeaderElectionModule.refreshLastHeartbeatTime();
// 心跳请求,直接返回
return new AppendEntriesRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm(),true);
}
}
在工程的test目录下,可以启动一个5节点的MyRaft的服务集群(用main方法启动即可),通过修改其中的RaftClusterGlobalConfig类可以修改相关的配置.
public class RaftClusterGlobalConfig {
public static Registry registry = RegistryFactory.getRegistry(
new RegistryConfig(RegistryCenterTypeEnum.FAKE_REGISTRY.getCode(), "127.0.0.1:2181"));
/**
* raft的集群配置
* */
public static final List<RaftNodeConfig> raftNodeConfigList = Arrays.asList(
new RaftNodeConfig("raft-1","127.0.0.1",8001)
,new RaftNodeConfig("raft-2","127.0.0.1",8002)
,new RaftNodeConfig("raft-3","127.0.0.1",8003)
,new RaftNodeConfig("raft-4","127.0.0.1",8004)
,new RaftNodeConfig("raft-5","127.0.0.1",8005)
);
public static final int electionTimeout = 3;
public static final Integer debugElectionTimeout = null;
public static final int HeartbeatInterval = 1;
/**
* N次心跳后,leader会自动模拟出现故障(退回follow,停止心跳广播)
* N<=0代表不触发自动模拟故障
*/
public static final int leaderAutoFailCount = 0;
/**
* 随机化的选举超时时间(毫秒)
* */
public static final Range<Integer> electionTimeoutRandomRange = new Range<>(150,500);
public static void initRaftRpcServer(String serverId){
RaftNodeConfig currentNodeConfig = RaftClusterGlobalConfig.raftNodeConfigList
.stream().filter(item->item.getServerId().equals(serverId)).findAny()
.orElseThrow(() -> new MyRaftException("serverId must in raftNodeConfigList"));
List<RaftNodeConfig> otherNodeList = RaftClusterGlobalConfig.raftNodeConfigList
.stream().filter(item->!item.getServerId().equals(serverId)).collect(Collectors.toList());
RaftConfig raftConfig = new RaftConfig(
currentNodeConfig,RaftClusterGlobalConfig.raftNodeConfigList);
raftConfig.setElectionTimeout(RaftClusterGlobalConfig.electionTimeout);
raftConfig.setDebugElectionTimeout(RaftClusterGlobalConfig.debugElectionTimeout);
raftConfig.setHeartbeatInternal(RaftClusterGlobalConfig.HeartbeatInterval);
raftConfig.setLeaderAutoFailCount(RaftClusterGlobalConfig.leaderAutoFailCount);
// 随机化选举超时时间的范围
raftConfig.setElectionTimeoutRandomRange(RaftClusterGlobalConfig.electionTimeoutRandomRange);
RaftRpcServer raftRpcServer = new RaftRpcServer(raftConfig, RaftClusterGlobalConfig.registry);
List<RaftService> raftServiceList = raftRpcServer.getRpcProxyList(otherNodeList);
// raft服务,启动!
raftRpcServer.init(raftServiceList);
}
}
验证lab1中MyRaft leader选举实现的正确性,可以通过以下几个case简单的验证下:
在原始的raft算法的leader选举中存在一个问题。具体场景举例如下:
从本质上来说,这个分区恢复后进行的新选举是无意义的。且由于进行选举会造成集群短暂的不可用,因此最好能避免这个问题.
业界给出的解决方法是在真正的选举前先发起一轮预选举(preVote).
MyRaft为了保持实现的简单性,并没有实现预选举机制。但etcd、sofa-jraft等流行的开源raft系统都是实现了预选举优化的,所以在这里还是简单介绍一下.
最后此篇关于手写raft(一)实现leader选举的文章就讲到这里了,如果你想了解更多关于手写raft(一)实现leader选举的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
背景: 我最近一直在使用 JPA,我为相当大的关系数据库项目生成持久层的轻松程度给我留下了深刻的印象。 我们公司使用大量非 SQL 数据库,特别是面向列的数据库。我对可能对这些数据库使用 JPA 有一
我已经在我的 maven pom 中添加了这些构建配置,因为我希望将 Apache Solr 依赖项与 Jar 捆绑在一起。否则我得到了 SolarServerException: ClassNotF
interface ITurtle { void Fight(); void EatPizza(); } interface ILeonardo : ITurtle {
我希望可用于 Java 的对象/关系映射 (ORM) 工具之一能够满足这些要求: 使用 JPA 或 native SQL 查询获取大量行并将其作为实体对象返回。 允许在行(实体)中进行迭代,并在对当前
好像没有,因为我有实现From for 的代码, 我可以转换 A到 B与 .into() , 但同样的事情不适用于 Vec .into()一个Vec . 要么我搞砸了阻止实现派生的事情,要么这不应该发
在 C# 中,如果 A 实现 IX 并且 B 继承自 A ,是否必然遵循 B 实现 IX?如果是,是因为 LSP 吗?之间有什么区别吗: 1. Interface IX; Class A : IX;
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我正在阅读标准haskell库的(^)的实现代码: (^) :: (Num a, Integral b) => a -> b -> a x0 ^ y0 | y0 a -> b ->a expo x0
我将把国际象棋游戏表示为 C++ 结构。我认为,最好的选择是树结构(因为在每个深度我们都有几个可能的移动)。 这是一个好的方法吗? struct TreeElement{ SomeMoveType
我正在为用户名数据库实现字符串匹配算法。我的方法采用现有的用户名数据库和用户想要的新用户名,然后检查用户名是否已被占用。如果采用该方法,则该方法应该返回带有数据库中未采用的数字的用户名。 例子: “贾
我正在尝试实现 Breadth-first search algorithm , 为了找到两个顶点之间的最短距离。我开发了一个 Queue 对象来保存和检索对象,并且我有一个二维数组来保存两个给定顶点
我目前正在 ika 中开发我的 Python 游戏,它使用 python 2.5 我决定为 AI 使用 A* 寻路。然而,我发现它对我的需要来说太慢了(3-4 个敌人可能会落后于游戏,但我想供应 4-
我正在寻找 Kademlia 的开源实现C/C++ 中的分布式哈希表。它必须是轻量级和跨平台的(win/linux/mac)。 它必须能够将信息发布到 DHT 并检索它。 最佳答案 OpenDHT是
我在一本书中读到这一行:-“当我们要求 C++ 实现运行程序时,它会通过调用此函数来实现。” 而且我想知道“C++ 实现”是什么意思或具体是什么。帮忙!? 最佳答案 “C++ 实现”是指编译器加上链接
我正在尝试使用分支定界的 C++ 实现这个背包问题。此网站上有一个 Java 版本:Implementing branch and bound for knapsack 我试图让我的 C++ 版本打印
在很多情况下,我需要在 C# 中访问合适的哈希算法,从重写 GetHashCode 到对数据执行快速比较/查找。 我发现 FNV 哈希是一种非常简单/好/快速的哈希算法。但是,我从未见过 C# 实现的
目录 LRU缓存替换策略 核心思想 不适用场景 算法基本实现 算法优化
1. 绪论 在前面文章中提到 空间直角坐标系相互转换 ,测绘坐标转换时,一般涉及到的情况是:两个直角坐标系的小角度转换。这个就是我们经常在测绘数据处理中,WGS-84坐标系、54北京坐标系
在软件开发过程中,有时候我们需要定时地检查数据库中的数据,并在发现新增数据时触发一个动作。为了实现这个需求,我们在 .Net 7 下进行一次简单的演示. PeriodicTimer .
二分查找 二分查找算法,说白了就是在有序的数组里面给予一个存在数组里面的值key,然后将其先和数组中间的比较,如果key大于中间值,进行下一次mid后面的比较,直到找到相等的,就可以得到它的位置。
我是一名优秀的程序员,十分优秀!