RateLimiterGuava包提供的限流器,采用了令牌桶算法,特定是均匀地向桶中添加令牌,每次消费时也必须持有令牌,否则就需要等待。应用场景之一是限制消息消费的速度,避免消息消费过快而对下游的数据库造成较大的压力。
本文主要介绍RateLimiter的源码,包括他的基本限流器SmoothBursty,以及带预热效果的SmoothWarmingUp

RateLimiter作为限流器的顶层类,只有两个属性:

  private final SleepingStopwatch stopwatch;
  private volatile Object mutexDoNotUseDirectly;

stopwatch用来计算时间间隔,以及实现了当拿不到令牌时将线程阻塞的功能;mutexDoNotUseDirectly主要用来进行线程同步。
RateLimiter作为一个抽象类,本身不能直接实例化,可以使用静态工厂方法来创建:

 public static RateLimiter create(double permitsPerSecond);  //①
 public static RateLimiter create(double permitsPerSecond, Duration warmupPeriod);  //②
 public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)  //③

RateLimiter对外提供了3个构造器,分成两类,构造器①是第一类,底层会创建基本限流器SmoothBursty;构造器②和③是第二类,底层会创建带warm up效果的SmoothWarmingUp。参数permitsPerSecond表示每秒产生多少个令牌,参数warmupPeriod是限流器warm up阶段的时间,即限流器产生令牌从最慢到最快所需要的时间,参数unitwarm up的时间单位。
SmoothRateLimiter新增了4个属性:

  //桶中存储的令牌数
  double storedPermits;
  //桶中允许的最大令牌数
  double maxPermits;
  //稳定状态下产生令牌是速度,其值为1/permitsPerSecond
  double stableIntervalMicros;
  //下一次请求需要等待的时间
  private long nextFreeTicketMicros = 0L; // could be either in the past or future

这其中比较有意思的是nextFreeTicketMicros字段,它表示下一次获取令牌的请求到来时需要等待的时间,该字段可以实现上一次获取令牌的请求预支的等待时间由下一次请求来兑现。
接下来先介绍下SmoothBursty的构造过程:

public static RateLimiter create(double permitsPerSecond) {
	return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}

static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
	RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
	rateLimiter.setRate(permitsPerSecond);
	return rateLimiter;
}

构造SmoothBursty时出传入了两个参数,stopwatch好理解,第二个参数意思是当限流器长时间没用时,令牌桶内最多存储多少秒的令牌,这里限定了最多只存储1秒钟的令牌,也就是permitsPerSecond个。
我们继续分析setRate方法的实现:

  public final void setRate(double permitsPerSecond) {
    checkArgument(
        permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
    synchronized (mutex()) {
      doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
  }

setRate方法先校验permitsPerSecond必须为整数,然后在同步块中执行doSetRate方法。mutex方法通过双重检测的方式实例化mutexDoNotUseDirectly字段,详细代码略去,doSetRate是抽象方法,其具体的实现在抽象子类SmoothRateLimiter中:

  final void doSetRate(double permitsPerSecond, long nowMicros) {
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
  }

doSetRate方法主要是设置了stableIntervalMicros字段,调用的两个方法resync和重载方法doSetRate我们接着分析。resync方法主要用来设置storedPermitsnextFreeTicketMicros这俩字段,代码如下:

  void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      //计算超过的这些时间里产生了多少新的令牌
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      //重新计算当前令牌桶内持有的令牌数
      storedPermits = min(maxPermits, storedPermits + newPermits);
      //更新下次准许获取令牌的时间为当前时间
      nextFreeTicketMicros = nowMicros;
    }
  }

此方法会根据当前的时间决定是否进行字段赋值,如果当前时间已经超过了nextFreeTicketMicros的值,那么就重新计算storedPermitsnextFreeTicketMicros字段,其中计算storedPermits的代码虽然容易理解,但是思路挺巧妙。一般来说,令牌桶算法的令牌需要以固定的速率进行添加,那么很自然想到可以起一个任务,按照一定的速度产生令牌,但是起一个新任务会占用一定的资源,从而加重系统的负担,此处的实现是根据利用时间差来计算这段时间产生的令牌数,以简单的计算完成了新任务需要做的事情,开销大大减少了。coolDownIntervalMicros方法是抽象方法,在SmoothBurstySmoothWarmingUp有不同的实现,在SmoothBursty的实现是直接返回stableIntervalMicros字段,这个字段目前还没设置过值,取默认值0.0,这里double的除零操作并不会抛异常,而是会返回无穷大。
我们接着看一下doSetRate方法,这也是个抽象方法,在SmoothBursty的实现如下:

    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = this.maxPermits;
      maxPermits = maxBurstSeconds * permitsPerSecond;
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = maxPermits;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? 0.0 // initial state
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

maxPermits在此之前并没有设置过值,因此默认是0.0,这里只是将storedPermits初始化成了0。不过这里的代码也说明,在执行期间maxPermits是可以在其他地方被修改的,如果出现了更改,就会等比例修改storedPermits的值。
到这里SmoothBursty的初始化过程就结束了,大体上是将内部的字段赋予了初始值。我们接下来看看SmoothBursty的使用:

  public double acquire() {
    return acquire(1);
  }

  public double acquire(int permits) {
    long microsToWait = reserve(permits);
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }

acquire方法用于从令牌桶中获取令牌,参数permits表示需要获取的令牌数量,如果当前没办法拿到需要的令牌,线程会阻塞一段时间,该方法返回等待的时间,reserve的实现如下:

  final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
  }

  final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    //返回等待时间,如果不需要等待,返回0
    return max(momentAvailable - nowMicros, 0);
  }

  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    resync(nowMicros);
    long returnValue = nextFreeTicketMicros;
    //取可用的令牌与需要的令牌两者的最小值
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    //计算该次请求超出的令牌数
    double freshPermits = requiredPermits - storedPermitsToSpend;
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);

    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    //扣减令牌桶库存
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
  }

reserve的核心逻辑在reserveEarliestAvailable方法中,该方法的主要思想是检查当前令牌桶内令牌数是否满足需求,如果满足则不需要额外的等待时间,否则需要将额外等待时间追加到nextFreeTicketMicros,需要注意的是方法返回的不是更新过后的nextFreeTicketMicros,而是上一次请求更新的时间,这个时间就是当前线程需要阻塞的时间,也就是说,当前请求所需要等待的时间是由下次请求完成的,下次请求需要的等待时间由下下次请求完成,以此类推。当前请求的令牌数超过令牌桶中的令牌数越多,下次请求需要等待的时间就越长。并且这里并没有对requiredPermits的上限做检查,这就允许预支令牌,即假设桶的上限是100个令牌,一次请求可以允许超过100个令牌,只是生成多余令牌的时间需要算到下一个请求上。同时这里的逻辑也说明,获取令牌是直接成功的,只是获取完令牌后才需要一小段等待时间。
到这里SmoothBursty的初始化以及获取令牌的所有逻辑就介绍完了,接下来看看另一个类SmoothWarmingUp的源码。

  static RateLimiter create(
      double permitsPerSecond,
      long warmupPeriod,
      TimeUnit unit,
      double coldFactor,
      SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }

我们之前介绍的另外领个构造器的底层调用的是这个包级的create方法,该方法的5个参数中,只有coldFactor是新出现的,字面意思是冷启动因子,源码写死了是3.0,该值表示指在warm up阶段开始时,以多大的速率产生令牌,速率是稳定速率的三分之一,冷启动阶段结束后恢复到正常速率。
setRate方法底层会调用如下的doSetRate方法:

  final void doSetRate(double permitsPerSecond, long nowMicros) {
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
  }

   void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
	  double oldMaxPermits = maxPermits;
	  //设置冷启动生成令牌的间隔是正常值的3倍(codeFactor固定为3)
	  double coldIntervalMicros = stableIntervalMicros * coldFactor;
	  thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
	  maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
	  //slope是梯形部分斜线的斜率
	  slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
	  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
	    // if we don't special-case this, we would get storedPermits == NaN, below
	    storedPermits = 0.0;
	  } else {
	    storedPermits =
	        (oldMaxPermits == 0.0)
	            ? maxPermits // initial state is cold
	            : storedPermits * maxPermits / oldMaxPermits;
	  }
	}


doSetRate的代码不容易理解,源码中利用图示介绍了几个变量之间的关系(但是本人仍然不是很理解,因此只能将结论放在这里,无法进行更多解释),如图所示,源码注释中说明了如下的两个等式:

  • 梯形的面积等于预热时间warmupPeriodMicros
warmupPeriodMicros = 0.5 * (coldIntervalMicros + stableIntervalMicros) * (maxPermits - thresholdPermits)

由此可以得到maxPermits的值:

maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
  • 左边矩形的面积是梯形面积的一半,由此可知:
warmupPeriodMicros * 0.5 = thresholdPermits * stableIntervalMicros

计算出thresholdPermits的值为:

thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros

SmoothWarmingUp的初始化逻辑到这里就结束了,接下来介绍下它获取令牌的流程,acquire方法的其他部分上文已经结束过,此处重点介绍storedPermitsToWaitTime方法:

    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      //存储的令牌数量超出thresholdPermits的部分,这部分反应在梯形区域
      double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
      long micros = 0;
      // measuring the integral on the right part of the function (the climbing line)
      if (availablePermitsAboveThreshold > 0.0) {
      	//permitsAboveThresholdToTake表示梯形区域的高
        double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
        //length计算的是梯形的上底+下底
        double length =
            permitsToTime(availablePermitsAboveThreshold)
                + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
        //梯形区域的面积,即生产梯形区域的令牌数所需要的时间
        micros = (long) (permitsAboveThresholdToTake * length / 2.0);
        //扣除掉需要消耗的梯形区域的令牌数,表示还需要从左侧矩形区域取得的令牌数量
        permitsToTake -= permitsAboveThresholdToTake;
      }
      // measuring the integral on the left part of the function (the horizontal line)
      //等待时间=梯形区域的时间+矩形区域的时间
      micros += (long) (stableIntervalMicros * permitsToTake);
      return micros;
    }

    //由前文可知,slope = =y/x = 产生令牌间隔/令牌数,permits * slope表示产生令牌间隔的增量,加上stableIntervalMicros表示梯形的底
    private double permitsToTime(double permits) {
      return stableIntervalMicros + permits * slope;
    }


此处的storedPermitsToWaitTimeSmoothBursty中的实现大不相同,SmoothBursty由于不需要预热,可以直接获取桶中的令牌,因此直接返回了0,而此处存在预热阶段,不能直接获取到令牌,因此计算逻辑稍微复杂些,总体来说,就是求图中阴影部分的面积。

版权声明:本文为NaLanZiYi-LinEr原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/NaLanZiYi-LinEr/p/14532431.html