gpt4 book ai didi

RateLimiter 源码分析

转载 作者:qq735679552 更新时间:2022-09-28 22:32:09 28 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章RateLimiter 源码分析由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

俗话说得好,缓存,限流和降级是系统的三把利剑。刚好项目中每天早上导出数据时因调订单接口频率过高,订单系统担心会对用户侧的使用造成影响,让我们对调用限速一下,所以就正好用上了。  。

常用的限流算法有2种:漏桶算法和令牌桶算法.

漏桶算法 。

漏桶算法:请求先进入“桶”中,然后桶以一定的速率处理请求。如果请求的速率过快会导致桶溢出。根据描述可以知道,漏桶算法会强制限制请求处理的速度。任你请求的再快还是再慢,我都是以这种速率来处理。  。

但是对于很多情况下,除了要求能够限制平均处理速度外,还要求能允许一定程度的的突发情况。这样的话,漏桶算法就不合适了,用令牌桶算法更合适.

令牌桶算法 。

令牌桶算法的原理是:系统以恒定的速率往桶里丢一定数量的令牌,请求只有拿到了令牌才能处理。当桶里没有令牌时便可拒绝服务。  。

Guava中的Ratelimiter便是实现的令牌桶算法,同时能支持一定程度的突发请求.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static RateLimiter one=RateLimiter.create( 2 ); //每秒2个
   private static RateLimiter two=RateLimiter.create( 2 ); //每秒2个
   private RateLimitUtil(){};
   public static void acquire(RateLimiter r, int num){
     double time =r.acquire(num);
     System.out.println( "wait time=" +time);
   }
   public static void main(String[] args) throws InterruptedException {
     acquire(one, 1 );
     acquire(one, 1 );
     acquire(one, 1 );
     System.out.println( "-----" );
     acquire(two, 10 );
     acquire(two, 1 );
   }

  。

输出结果:

?
1
2
3
4
5
6
wait time= 0.0
wait time= 0.499163
wait time= 0.489308
-----
wait time= 0.0
wait time= 4.497819

可以看到,我们以每秒2个请求的速度生成令牌。对one来说,当第2次和第3次获取请求的时候,等待的时间加起来就差不多刚好是1秒。对two来说,当第一次获取了10个令牌之后,第二次获取1个请求,就差不多等待5S(10/2=5)。可以看到,guava通过限制后面请求的等待时间,来支持一定程度的突发请求.

接下来,就是通过源码来解析它!  。

当我第一次看到令牌桶的算法描述的时候,我还以为真是有一个线程每隔X秒往一个类似计数器的地方加数字呢….  。

guava的限流算法有2种模式,一种是稳定速度,还有一种是生成令牌的速度慢慢提升直到维持在一个稳定的速度。2种模式原理类似,只是在具体等待多久的时间计算上有区别。以下就专门指稳定速度的模式.

先来看看它的acquire()方法:

?
1
2
3
4
5
public double acquire( int permits) {
   long microsToWait = reserve(permits); //先计算获取这些请求需要让线程等待多长时间
   stopwatch.sleepMicrosUninterruptibly(microsToWait); //让线程阻塞microTowait微秒长的时间
   return 1.0 * microsToWait / SECONDS.toMicros(1L); //返回阻塞的时间
  }

主要分3步:  。

1. 根据limiter创建时传入的参数,计算出生成这些数量的令牌需要多长的时间。  。

2. 让线程阻塞microTowait这么长的时间(单位:微秒)  。

3. 再返回阻塞了多久,单位:秒 。

具体它是怎么计算需要多长时间的呢?让我们来看看reserve(permits)方法.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final long reserve( int permits) {
   checkPermits(permits); //检查参数是否合法
   synchronized (mutex()) {
    return reserveAndGetWaitLength(permits, stopwatch.readMicros());
   }
  }
    
    
    
  final long reserveAndGetWaitLength( int permits, long nowMicros) {
   long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
   return max(momentAvailable - nowMicros, 0 );
  }
    
    
    
  final long reserveEarliestAvailable( int requiredPermits, long nowMicros) {
   resync(nowMicros); //here
   long returnValue = nextFreeTicketMicros;
   double storedPermitsToSpend = min(requiredPermits, this .storedPermits);
   double freshPermits = requiredPermits - storedPermitsToSpend;
   long waitMicros = storedPermitsToWaitTime( this .storedPermits, storedPermitsToSpend)
     + ( long ) (freshPermits * stableIntervalMicros);
   this .nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
   this .storedPermits -= storedPermitsToSpend;
   return returnValue;
  }

最终调用的是reserveEarliestAvailable方法。先看看resync(nowMicros)方法.

?
1
2
3
4
5
6
7
8
private void resync( long nowMicros) {
   // if nextFreeTicket is in the past, resync to now
   if (nowMicros > nextFreeTicketMicros) {
    storedPermits = min(maxPermits,
      storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
    nextFreeTicketMicros = nowMicros;
   }
  }

nextFreeTicketMicros的意思是:下次获取的时候需要减去的时间。如果是第一次调用accquire()方法,那nowMicros - nextFreeTicketMicros 就是从初始化(初始化的时候会给nextFreeTicketMicros 赋值一次,具体可以看RateLimiter的构造器)到第一次请求,这中间发生的时间。  。

这个方法的意思,如果当前时间比上一轮设置的下次获取的时间大(因为存在提前获取的情况,比如上次直接获取了10个,那上轮设置的nextFreeTicketMicros就是上一轮的时间+5s。后面会提到),那就计算这个中间理论上能生成多少的令牌。比如这中间隔了1秒钟,然后stableIntervalMicros=5000(稳定生成速度的情况下),那么,就这中间就可以生成2个令牌。再加上它原先存储的storedPermits个,如果比maxPermits大,那最大也只能存maxPermits这么多。如果比maxPermits小,那就是storedPermits=原先存的+这中间生成的数量。同时记录下下次获取的时候需要减去的时间,也就是当前时间 (nextFreeTicketMicros )。  。

接下来继续看reserveEarliestAvailable方法:

?
1
2
3
4
5
6
7
8
9
10
11
final long reserveEarliestAvailable( int requiredPermits, long nowMicros) { //1
   resync(nowMicros);   //2
   long returnValue = nextFreeTicketMicros; //3
   double storedPermitsToSpend = min(requiredPermits, this .storedPermits); //4
   double freshPermits = requiredPermits - storedPermitsToSpend; //5
   long waitMicros = storedPermitsToWaitTime( this .storedPermits, storedPermitsToSpend)
     + ( long ) (freshPermits * stableIntervalMicros); //6
   this .nextFreeTicketMicros = nextFreeTicketMicros + waitMicros; //7
   this .storedPermits -= storedPermitsToSpend; //8
   return returnValue; //9
  }

我们一行一行来看:  。

第二行设置好之后。第3行中将下次获取的时候需要减去的时间作为返回值(这点很重要)。  。

这2句是什么意思呢?  。

其实这2句就是使得RateLimiter能一定程度的突发请求的原因。假设requiredPermits=10,而我们能存的storedPermits=2,那么freshPermits=8,也就是多取了8个。而第6行就是计算这多取的8个需要多长时间才能生成?需要3秒。那么,就将这3秒钟加到我们前面赋值的“下次获取的时候需要减去的时间 ”。  。

比如在05秒的时候一次性获取了10个,那么,第7行的意思就是nextFreeTicketMicros=13S对应的系统的毫秒数。然后storedPermits就是-8。当过了1秒钟,下一次请求来调用acquire(1)的时候,resync方法中由于nowMicros 。

?
1
2
3
4
final long reserveAndGetWaitLength( int permits, long nowMicros) {
   long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
   return max(momentAvailable - nowMicros, 0 ); //取较大的值
  }

也就是说,reserveAndGetWaitLength会返回max(13-6,0),也就是7。而该方法的返回值又是用于sleep线程的,也就是我们在一开始看到的:

?
1
2
3
4
5
public double acquire( int permits) {
   long microsToWait = reserve(permits);
   stopwatch.sleepMicrosUninterruptibly(microsToWait);
   return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }

总结起来,最主要的是nowMicros,nextFreeTicketMicros这2个值。nextFreeTicketMicros在一开始构造器执行的时候会赋值一次为构造器执行的时间。当第一次调用accquire()的时候,resync会被执行,然后在accquire()中将nextFreeTicketMicros设置为当前时间。但是,需要注意的是,在reserveEarliestAvailable中会根据请求的令牌数和当前存储的令牌数进行比较。如果请求的令牌数很大,则会计算出生成这些多余的令牌需要的时间,并加在nextFreeTicketMicros上,从而保证下次调用accquire()的时候,根据nextFreeTicketMicros和当时的nowMicros相减,若>0,则需要等到对应的时间。也就能应对流量的突增情况了。  。

所以最重要的是nextFreeTicketMicros,它记录了你这次获取的时候,能够开始生成令牌的时间。比如当前是05S,那若nextFreeTicketMicros=10,表示它要到10S才能开始生成令牌,谁叫前面的多拿了这么多呢。至于它这次是多拿了还是只是拿一个令牌,等待时间都是这么多。如果这次又多拿了,那下次就等待更久! 。

?
1
2
3
4
5
6
7
8
9
10
11
12
private static RateLimiter too=RateLimiter.create( 2 ); //每秒2个
   private RateLimitUtil(){};
   public static void acquire(RateLimiter r, int num){
     double time =r.acquire(num);
     System.out.println( "wait time=" +time);
   }
   public static void main(String[] args) throws InterruptedException {
     acquire(too, 1 );
     acquire(too, 10 ); //只等待了0.5秒就获取了10个
     acquire(too, 10 ); //等待了5秒就获取了10个
     acquire(too, 1 ); //虽然只获取1个,也是等待5秒
   }

总结 。

以上就是本文关于RateLimiter 常用方法以及源码分析的全部内容,希望对大家有所帮助。感谢大家对我网站的支持! 。

原文链接:http://blog.csdn.net/foolishandstupid/article/details/76285690#comments 。

最后此篇关于RateLimiter 源码分析的文章就讲到这里了,如果你想了解更多关于RateLimiter 源码分析的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com