gpt4 book ai didi

SOFAJRaft源码阅读(肆)-Netty时间轮算法的实践

转载 作者:我是一只小鸟 更新时间:2023-01-29 22:38:15 25 4
gpt4 key购买 nike

SOFAJRaft的定时任务调度器是基于Netty来实现的,所以本文将会基于Netty时间轮算法,然后再结合SOFAJRaft源码进行分析。 @Author:Akai-yuan @更新时间:2023/1/29 。

1.HashedWheelTimer概览

一个时间轮算法的组成成分图: 一个基于Netty实现的时间轮(HashedWheelTimer)有三个核心部分:HashedWheelTimeout(任务封装)、HashedWheelBucket(任务链表)、Worker(任务执行线程) 。

属性概览

HashedWheelTimer字段的具体作用全部以注释的形式标记在以下代码块中。 我们可以先看看HashedWheelTimer的属性,看不懂没有关系,可以先大致了解一下一个时间轮的属性有些什么.

                        
                          	//日志
	private static final Logger LOG = LoggerFactoryLoggerFactory.getLogger(HashedWheelTimer.class);
    //实例数量限制为256
	private static final int INSTANCE_COUNT_LIMIT   = 256;
	//实例计数器
    private static final AtomicInteger instanceCounter = new AtomicInteger();
	//超过实例最大数量后是否警告标识,配合INSTANCE_COUNT_LIMIT字段使用
    private static final AtomicBoolean warnedTooManyInstances = new AtomicBoolean();
	//原子更新字段类,用于原子更新workerState属性
    private static final AtomicIntegerFieldUpdater<HashedWheelTimer> workerStateUpdater = 
        AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class,"workerState");
	//继承自Runnable,Worker是整个时间轮的执行流程管理者
    private final Worker worker = new Worker();
	//工作线程
    private final Thread workerThread;
	//工作状态码常量类【0 - init, 1 - started, 2 - shut down】
    public static final int WORKER_STATE_INIT      = 0;
    public static final int WORKER_STATE_STARTED   = 1;
    public static final int WORKER_STATE_SHUTDOWN  = 2;
	//工作状态码
    @SuppressWarnings({ "unused", "FieldMayBeFinal" })
    private volatile int workerState;
 	// tick的时长,也就是指针多久转一格
    private final long tickDuration;
	//时间轮数组,每个位置是一个HashedWheelBucket
    private final HashedWheelBucket[] wheel;
	//寻址标识符用于快速寻址
	//公式:mask==wheel.length-1
	//原理:当x=2^n(n为自然数)时 a%x=a&(x-1)
    private final int mask;
    //一个等待startTime初始化的计数器
    private final CountDownLatch startTimeInitialized   = new CountDownLatch(1);
	//用来暂时存放待加入时间轮的任务的队列
    private final Queue<HashedWheelTimeout> timeouts = new ConcurrentLinkedQueue<>();
	//用来暂时存放已被取消的任务的队列
    private final Queue<HashedWheelTimeout> cancelledTimeouts = new ConcurrentLinkedQueue<>();
    //未执行任务的计数器
	private final AtomicLong pendingTimeouts = new AtomicLong(0);
    //未执行任务的最大数量
	private final long maxPendingTimeouts;
    //开始时间
	private volatile long startTime;

                        
                      

构造器概览

这个构造器里面主要做一些初始化的工作.

  1. 初始化一个数组长度为2048的Wheel时间轮。由于传入的数组长度可能为Big Number,所以我去SOFAJRaft上提了一个issue,建议采用JAVA8-HashMap的相关实现来完善该算法,可见于: ISSUE-时间轮算法存在多循环低效率问题
  2. 初始化mask,用来快速计算槽位的下标。
  3. 初始化tickDuration并转化成纳秒
  4. 校验整个时间轮走完的时间不能过长
  5. 将worker包装成thread
  6. 限制HashedWheelTimer实例数量
                        
                          public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,
                            long maxPendingTimeouts) {
    	//判空
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }
        //将ticksPerWheel规格化为2的幂,并初始化轮子
        wheel = createWheel(ticksPerWheel);
        //寻址标识符
        mask = wheel.length - 1;
        //将tickDuration(时间单位为unit)转换为纳秒
        this.tickDuration = unit.toNanos(tickDuration);
        //防止溢出,指针转动的时间间隔不能超过:Long.MAX_VALUE/wheel.length
        if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format(
 			"tickDuration: %d (expected: 0 < tickDuration in nanos < %d", 
                tickDuration, Long.MAX_VALUE/ wheel.length));
        }
    	//将worker包装成thread
        workerThread = threadFactory.newThread(worker);
    	//默认-1
        this.maxPendingTimeouts = maxPendingTimeouts;
    	//如果HashedWheelTimer实例太多,会打印error日志
        if (instanceCounter.incrementAndGet() > INSTANCE_COUNT_LIMIT
            && warnedTooManyInstances.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }

                        
                      

实现原理图:

HashedWheelTimer是整个时间轮算法的核心类,通过指定的Hash规则将不同TimeOut定时任务划分到HashedWheelBucket进行管理,而HashedWheelBucket利用双向链表结构维护了某一时刻需要执行的定时任务列表.

接上文 SOFAJRaft源码阅读-模块启动过程 ,我们知道,在NodeImpl#init方法中,构造了多个RepeatedTimer实例:voteTimer、electionTimer、stepDownTimer、snapshotTimer。并且重写了RepeatedTimer#onTrigger和RepeatedTimer#adjustTimeout两个方法。紧接着,NodeImpl中的多个方法(如:init、electSelf、becomeLeader)会对这些RepeatedTimer实例调用RepeatedTimer#start方法启动.

2.启动计时器

  • 加ReentrantLock锁,保证只能一个线程调用这个方法
  • 启动状态参数校验
  • 调用RepeatedTimer#schedule方法
  • 释放锁
                        
                              public void start() {
        this.lock.lock();
        try {
            if (this.destroyed) {
                return;
            }
            if (!this.stopped) {
                return;
            }
            this.stopped = false;
            if (this.running) {
                return;
            }
            this.running = true;
            schedule();
        } finally {
            this.lock.unlock();
        }
    }

                        
                      

3.任务调度

RepeatedTimer#start中会调用RepeatedTimer#schedule:

  • 如果RepeatedTimer中维护的HashedWheelTimeout(任务)不为空,则取消(HashedWheelTimer#cancel)该任务。
  • 声明一个TimerTask,并通过HashedWheelTimer#newTimeout()构造一个HashedWheelTimeout
                        
                          private void schedule() {
        if (this.timeout != null) {
            this.timeout.cancel();
        }
        final TimerTask timerTask = timeout -> {
            try {
                //执行onTrigger,并设置状态参数
                RepeatedTimer.this.run();
            } catch (final Throwable t) {
                LOG.error("Run timer task failed, taskName={}.", RepeatedTimer.this.name, t);
            }
        };
        this.timeout = this.timer.newTimeout(timerTask, adjustTimeout(this.timeoutMs), TimeUnit.MILLISECONDS);
    }

                        
                      

HashedWheelTimer#cancel: 取消一个任务,并将其放入另一个cancelledTimeouts队列 。

                        
                          public boolean cancel() {
    		//只更新将在下一刻从HashedWheelBucket中删除的状态
           	if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
                return false;
            }
    		//如果一个任务应该被取消,我们将其放入另一个cancelledTimeouts队列,该队列将在每次tick时处理。
			//因此,这意味着我们将有一个GC延迟,最大为1个tick的持续时间,这已经足够好了。
    		//这样,我们可以再次使用MpscLinkedQueue,从而尽可能减少锁定/开销。
            timer.cancelledTimeouts.add(this);
            return true;
        }

                        
                      

4. 往时间轮内添加任务

重点理解HashedWheelTimer#newTimeout:

  • 判空
  • 校验pendingTimeoutsCount参数的合理性,如果maxPendingTimeouts(最大的等待加入的任务的数量)为0或负数,则表示不需要对 pendingTimeoutsCount 进行数量限制,否则会进行比较,超过限制则会抛出异常。
  • 调用HashedWheelTimer#start()启动时间轮。
  • 计算当前添加任务的执行时间。传入的delay参数由RepeatedTimer#adjustTimeout(this.timeoutMs)获取
  • 防溢出操作
  • 最后将任务加入队列,此时还未加入到时间轮中,需要 等待时钟拨动 (也就是当调用链路 HashedWheelTimer#start->workerThread#start->Worker#run->waitForNextTick 返回了参数时)才会触发往时间轮内添加任务
                        
                              public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        //等待的任务数 +1
        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
        // 如果时间轮内等待的任务数大于最大值,任务会被抛弃
        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount
                        + ") is greater than or equal to maximum allowed pending "
                        + "timeouts (" + maxPendingTimeouts + ")");
        }
        // 开启时间轮内的线程
        start();
        // 计算当前添加任务的执行时间
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
        // 防止溢出
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        // 将任务加入队列(注意,此时还未加入到时间轮中)
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

                        
                      

5.开启时间轮内的线程

我们知道,新添加的任务会先保存在timeouts队列中,当时间轮的时钟拨动时才会判断是否将队列中的任务加载进时间轮。那么工作线程开启后, start() 方法会被阻塞 ,等工作线程(workerThread.start())的 startTime 属性初始化完成后才被唤醒。 因为上面的 newTimeout 方法在线程开启后【start()】,需要计算当前添加进来任务的执行时间【long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;】,而这个执行时间是根据 startTime 计算的。 HashedWheelTimer#start:

  • 判断当前时间轮的状态,如果是初始化,则启动worker线程,启动整个时间轮;如果已经启动则略过;如果是已经停止,则报错。
  • 如果初始化未完成,则需要等待worker线程完成startTime的初始化
                        
                              public void start() {
    // Lock Free设计。可能有多个线程调用启动方法,这里使用AtomicIntegerFieldUpdater原子的更新时间轮的状态,
    // 它是JUC里面的类,利用反射进行原子操作。有比AtomicInteger更好的性能和更低得内存占用
        switch (workerStateUpdater.get(this)) {
            case WORKER_STATE_INIT:
                if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    workerThread.start();
                }
                break;
            case WORKER_STATE_STARTED:
                break;
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }
    	// startTimeInitialized 是一个 CountDownLatch,目的是为了保证工作线程的 startTime 属性初始化
        // startTime的初始化和startTimeInitialized.countDown()方法会在Worker#run
        // [也就是workerThread.start()中]完成
        while (startTime == 0) {
            try {
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }

                        
                      

6. 时间轮调度

时间轮每拨动一次就会触发tick++,然后tick与mask(时间轮数组长度 - 1)进行 & 运算,可以快速定位时间轮数组内的槽【mask寻址标识符用于快速寻址,其原理:当x=2^n(n为自然数)时 a%x=a&(x-1)】。因为 tick 值一直在增加,所以时间轮数组看起来就像一个不断循环的圆.

  • 先初始化 startTime 值,因为后面任务执行的时间是根据 startTime 计算的
  • 时钟拨动,如果时间未到,则 sleep 一会儿
  • 处理过期的任务
  • 将任务加载进时间轮
  • 执行当前时钟对应时间轮内的任务
  • 时间轮关闭,将所有未执行的任务封装到 unprocessedTimeouts 集合中,在 stop 方法中返回出去
  • 处理过期的任务
                        
                          public void run() {
            // 初始化 startTime
            startTime = System.nanoTime();
            if (startTime == 0) {
                startTime = 1;
            }

            // 用来唤醒被阻塞的 HashedWheelTimer#start() 方法,保证 startTime 初始化
            startTimeInitialized.countDown();

            do {
                // 时钟拨动,有返回值(必须是正数)的时候说明可以拨动时钟了
                final long deadline = waitForNextTick();
                if (deadline > 0) {
                    int idx = (int) (tick & mask);
                    // 处理过期的任务
                    processCancelledTasks();
                    HashedWheelBucket bucket = wheel[idx];
                    // 将任务加载进时间轮
                    transferTimeoutsToBuckets();
                    // 执行当前时间轮槽内的任务
                    bucket.expireTimeouts(deadline);
                    tick++;
                }
            } while (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

            // 时间轮关闭,将还未执行的任务以列表的形式保存到 unprocessedTimeouts 集合中,在 stop 方法中返回出去
            // 还未执行的任务可能会在两个地方,一:时间轮数组内,二:队列中
            for (HashedWheelBucket bucket : wheel) {
                bucket.clearTimeouts(unprocessedTimeouts);
            }
            for (;;) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.isCancelled()) {
                    unprocessedTimeouts.add(timeout);
                }
            }
            processCancelledTasks();
        }

                        
                      

7. 时间拨动

Worker#waitForNextTick:

  • 当时钟拨动一次后,应该计算下一次时钟拨动的时间
  • 获取当前时间的相对时间
  • 计算距离时钟下次拨动的时间,也就是sleepTimeMs。这里之所以 加 999999 后再除 10000000 , 是为了保证足够的 sleep 时间。例如:当 deadline - currentTime = 2000002 的时候,如果不加 999999,则只睡了 2ms。而 2ms 其实是未到达 deadline 时间点的,所以为了使上述情况能 sleep 足够的时间,加上 999999 后,会多睡 1ms。
  • 如果还没到就 sleep 一会儿,等到拨动时间再醒来。
  • 进入下一次循环,直到sleepTimeMs<=0 说明可以拨动时钟了
                        
                                  private long waitForNextTick() {
            // 计算时钟下次拨动的相对时间
            long deadline = tickDuration * (tick + 1);

            for (;;) {
                // 获取当前时间的相对时间
                final long currentTime = System.nanoTime() - startTime;
                // 计算距离时钟下次拨动的时间
                long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
            	// <=0 说明可以拨动时钟了
                if (sleepTimeMs <= 0) {
                    if (currentTime == Long.MIN_VALUE) {
                        return -Long.MAX_VALUE;
                    } else {
                        return currentTime;
                    }
                }
            	// sleep 到下次时钟拨动
                try {
                    Thread.sleep(sleepTimeMs);
                } catch (InterruptedException ignored) {
                    if (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                        return Long.MIN_VALUE;
                    }
                }
            }
        }

                        
                      

8. 移除取消的任务

Worker#processCancelledTasks:

  • 遍历cancelledTimeouts中所有实例并从其对应HashedWheelBucket中移除。
  • 在调用HashedWheelTimer的stop方法的时候会将要取消的HashedWheelTimeout实例放入到cancelledTimeouts队列中,所以这里只需要循环把队列中的数据取出来,然后调用HashedWheelTimeout的remove方法将自己在bucket移除就好了。
                        
                                  private void processCancelledTasks() {
            //遍历cancelledTimeouts中所有实例并从其对应HashedWheelBucket中移除
            for (;;) {
                HashedWheelTimeout timeout = cancelledTimeouts.poll();
                if (timeout == null) {
                    // all processed
                    break;
                }
                try {
                    timeout.remove();
                } catch (Throwable t) {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("An exception was thrown while process a cancellation task", t);
                    }
                }
            }
        }

                        
                      

9. 将任务从队列加载进时间轮

Worker#transferTimeoutsToBuckets: 在上面也提到过,任务刚加进来不会立即到时间轮中去,而是暂时保存到一个队列中,当时间轮时钟拨动时,会将任务从队列中加载进时间轮内.

  • 每次调用这个方法会处理10w个任务,以免阻塞worker线程
  • 从timeouts中取出任务
  • 在校验之后会用timeout的deadline除以每次tick运行的时间tickDuration得出需要经过多少次时钟拨动才会运行这个timeout的任务
  • 计算时间轮拨动的圈数。由于timeout的deadline实际上还包含了worker线程启动到timeout加入队列这段时间,所以在算remainingRounds的时候需要减去当前的tick次数。
  • 将任务加载进时间轮对应的槽内,可能有多个任务经过 hash 计算后定位到同一个槽,这些任务会以双向链表的结构保存,有点类似 HashMap 处理碰撞的情况。
                        
                                  private void transferTimeoutsToBuckets() {
            // 一次最多只处理队列中的 100000 个任务
            for (int i = 0; i < 100000; i++) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    // all processed
                    break;
                }
                // 过滤已经取消的任务
                if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                    // Was cancelled in the meantime.
                    continue;
                }
				// 计算当前任务到执行还需要经过几次时钟拨动
                // 假设时间轮数组大小是 10,calculated 为 12,需要时间轮转动一圈加两次时钟拨动后后才能执行这个任务,因此还需要计算一下圈数
                long calculated = timeout.deadline / tickDuration;
                // 计算当前任务到执行还需要经过几圈时钟拨动
                timeout.remainingRounds = (calculated - tick) / wheel.length;
				// 有的任务可能在队列里很长时间,时间过期了也没有被调度,将这种情况的任务放在当前轮次内执行
                final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
                // 计算任务在时间轮数组中的槽
                int stopIndex = (int) (ticks & mask);

                HashedWheelBucket bucket = wheel[stopIndex];
                // 将任务放到时间轮的数组中,多个任务可能定位时间轮的同一个槽,这些任务通过以链表的形式链接
                bucket.addTimeout(timeout);
            }
        }

                        
                      

10. 执行任务

HashedWheelBucket#expireTimeouts: 时间轮槽内的任务以链表形式存储,这些任务执行的时间可能会不一样,有的在当前时钟执行,有的在下一圈或者之后几圈对应的时钟才会执行。当任务在当前时钟执行时,需要将这个任务从链表中删除,重新维护链表关系.

                        
                                  public void expireTimeouts(long deadline) {
            HashedWheelTimeout timeout = head;

            // process all timeouts
            while (timeout != null) {
                HashedWheelTimeout next = timeout.next;
                // 任务执行的圈数 > 0,表示任务还需要经过 remainingRounds 圈时钟循环才能执行
                if (timeout.remainingRounds <= 0) {
                    // 从链表中移除当前任务,并返回链表中下一个任务
                    next = remove(timeout);
                    if (timeout.deadline <= deadline) {
                        // 执行任务
                        timeout.expire();
                    } else {
                        // The timeout was placed into a wrong slot. This should never happen.
                        throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)",
                            timeout.deadline, deadline));
                    }
                } else if (timeout.isCancelled()) {
                    // 过滤取消的任务
                    next = remove(timeout);
                } else {
                    // 圈数 -1
                    timeout.remainingRounds--;
                }
                timeout = next;
            }
        }

                        
                      

HashedWheelTimeout#expire:

  • CAS切换任务状态
  • task.run(this)执行任务
                        
                                  public void expire() {
            // CAS任务状态变换
            if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
                return;
            }

            try {
                task.run(this);
            } catch (Throwable t) {
                if (LOG.isWarnEnabled()) {
                    LOG.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
                }
            }
        }

                        
                      

11. 终止时间轮

HashedWheelTimer#stop(): 触发终止时间轮的链路为:NodeImpl#destroyAllTimers()->RepeatedTimer#destroy()->timer.stop() 。

  • 当终止时间轮时,将时间轮的状态修改为 WORKER_STATE_SHUTDOWN 。时间轮状态有两种情况:(1) WORKER_STATE_INIT :当初始化时间轮对象时并不会立即开启时间轮工作线程,而是第一次添加任务时才开启,此状态表示时间轮没有处理过任务(2) WORKER_STATE_STARTED :时间轮在工作,这里也有两种情况,存在并发与不存在并发,如果多个线程都尝试终止时间轮,肯定只能有一个成功。
  • 时间轮停止运行后会将未执行的任务返回出去,至于怎么处理这些任务,由业务方自己定义,这个流程和线程池的 shutdownNow 方法是类似的。
  • 如果时间轮在运行,如果时间轮处于非运行状态,会把时间轮数组与队列中未执行且未取消的任务保存到 unprocessedTimeouts 集合中。而终止时间轮成功的线程只需要等待一会儿,这个等待通过 workerThread.join(100)实现。
                        
                              public Set<Timeout> stop() {
        // 终止时间轮的线程不能是时间轮的工作线程
        if (Thread.currentThread() == workerThread) {
            throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from "
                                            + TimerTask.class.getSimpleName());
        }
        // 将时间轮的状态修改为 WORKER_STATE_SHUTDOWN,这里有两种情况
        // 一:时间轮是 WORKER_STATE_INIT 状态,表明时间轮从创建到终止一直没有任务进来
        // 二:时间轮是 WORKER_STATE_STARTED 状态,多个线程尝试终止时间轮,只有一个操作成功
        if (!workerStateUpdater.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
            // 代码走到这里,时间轮只能是两种状态中的一个,WORKER_STATE_INIT 和 WORKER_STATE_SHUTDOWN
            // 为 WORKER_STATE_INIT 表示时间轮没有任务,因此不用返回未处理的任务,但是需要将时间轮实例 -1
            // 为 WORKER_STATE_SHUTDOWN 表示是 CAS 操作失败,什么都不用做,因为 CAS 成功的线程会处理
            if (workerStateUpdater.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
                 // 时间轮实例对象 -1
                instanceCounter.decrementAndGet();
            }
        	// CAS 操作失败,或者时间轮没有处理过任务,返回空的任务列表
            return Collections.emptySet();
        }

        try {
            boolean interrupted = false;
            while (workerThread.isAlive()) {
                // 中断时间轮工作线程
                workerThread.interrupt();
                try {
                    // 终止时间轮的线程等待时间轮工作线程 100ms,这个过程主要是为了时间轮工作线程处理未执行的任务
                    workerThread.join(100);
                } catch (InterruptedException ignored) {
                    interrupted = true;
                }
            }

            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        } finally {
            instanceCounter.decrementAndGet();
        }
        // 返回未处理的任务
        return worker.unprocessedTimeouts();
    }

                        
                      

写在后面

SOFAJRaft采用的是Netty的时间轮算法来实现任务调度器,但是Netty的时间轮算法存在一定缺陷,比如:它是通过 单线程实现 的,如果在执行任务的过程中出现阻塞,会影响后面任务执行。Netty 中的时间轮并 不适合创建延迟时间跨度很大的任务 ,比如往时间轮内丢成百上千个任务并设置 10 天后执行,这样可能会导致链表过长 round 值很大,而且这些任务在执行之前会一直占用内存。 在阅读这部分代码的时候,发生了一个有趣的事情。作者发现在时间轮算法中有一部分代码是可以被优化的: 在HashedWheelTimer#normalizeTicksPerWheel方法中,当ticksPerWheel的值较大时,这个方法会循环很多次,方法执行时间会不稳定,导致效率可能会偏低。感觉可以使用java8 HashMap的相关实现来完善改算法,具体实现如下:

                        
                          int n = ticksPerWheel - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
// 此处1073741824 = 2^30,防止溢出
return (n < 0) ? 1 : (n >= 1073741824) ? 1073741824 : n + 1;

                        
                      

想到这些,于是我就去给SOFAJRaft社区提了一个issue,得到了几位大佬的approve。于是乎我就提了PR。虽然看代码确实头疼,但是整个过程还是挺快乐.

到这里SOFAJRaft的定时任务调度器就差不多完整的走了一遍,第一遍看确实很容易懵逼,但是再读几遍还是会感觉很有成就感的。作者总结完这篇文章,差不多刚过完年,希望后面能继续坚持下去.

最后此篇关于SOFAJRaft源码阅读(肆)-Netty时间轮算法的实践的文章就讲到这里了,如果你想了解更多关于SOFAJRaft源码阅读(肆)-Netty时间轮算法的实践的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com