RateLimiter源码解析
RateLimiter
是Guava
包提供的限流器,采用了令牌桶算法,特定是均匀地向桶中添加令牌,每次消费时也必须持有令牌,否则就需要等待。应用场景之一是限制消息消费的速度,避免消息消费过快而对下游的数据库造成较大的压力。
本文主要介绍RateLimiter
的源码,包括他的基本限流器SmoothBursty
,以及带预热效果的SmoothWarmingUp
。RateLimiter
作为限流器的顶层类,只有两个属性:
private final SleepingStopwatch stopwatch;
private volatile Object mutexDoNotUseDirectly;
stopwatch
用来计算时间间隔,以及实现了当拿不到令牌时将线程阻塞的功能;mutexDoNotUseDirectly
主要用来进行线程同步。RateLimiter
作为一个抽象类,本身不能直接实例化,可以使用静态工厂方法来创建:
public static RateLimiter create(double permitsPerSecond); //①
public static RateLimiter create(double permitsPerSecond, Duration warmupPeriod); //②
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) //③
RateLimiter
对外提供了3个构造器,分成两类,构造器①是第一类,底层会创建基本限流器SmoothBursty
;构造器②和③是第二类,底层会创建带warm up
效果的SmoothWarmingUp
。参数permitsPerSecond
表示每秒产生多少个令牌,参数warmupPeriod
是限流器warm up
阶段的时间,即限流器产生令牌从最慢到最快所需要的时间,参数unit
是warm up
的时间单位。SmoothRateLimiter
新增了4个属性:
//桶中存储的令牌数
double storedPermits;
//桶中允许的最大令牌数
double maxPermits;
//稳定状态下产生令牌是速度,其值为1/permitsPerSecond
double stableIntervalMicros;
//下一次请求需要等待的时间
private long nextFreeTicketMicros = 0L; // could be either in the past or future
这其中比较有意思的是nextFreeTicketMicros
字段,它表示下一次获取令牌的请求到来时需要等待的时间,该字段可以实现上一次获取令牌的请求预支的等待时间由下一次请求来兑现。
接下来先介绍下SmoothBursty
的构造过程:
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
构造SmoothBursty
时出传入了两个参数,stopwatch
好理解,第二个参数意思是当限流器长时间没用时,令牌桶内最多存储多少秒的令牌,这里限定了最多只存储1秒钟的令牌,也就是permitsPerSecond
个。
我们继续分析setRate
方法的实现:
public final void setRate(double permitsPerSecond) {
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
setRate
方法先校验permitsPerSecond
必须为整数,然后在同步块中执行doSetRate
方法。mutex
方法通过双重检测的方式实例化mutexDoNotUseDirectly
字段,详细代码略去,doSetRate
是抽象方法,其具体的实现在抽象子类SmoothRateLimiter
中:
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
doSetRate
方法主要是设置了stableIntervalMicros
字段,调用的两个方法resync
和重载方法doSetRate
我们接着分析。resync
方法主要用来设置storedPermits
和nextFreeTicketMicros
这俩字段,代码如下:
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
//计算超过的这些时间里产生了多少新的令牌
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
//重新计算当前令牌桶内持有的令牌数
storedPermits = min(maxPermits, storedPermits + newPermits);
//更新下次准许获取令牌的时间为当前时间
nextFreeTicketMicros = nowMicros;
}
}
此方法会根据当前的时间决定是否进行字段赋值,如果当前时间已经超过了nextFreeTicketMicros
的值,那么就重新计算storedPermits
和nextFreeTicketMicros
字段,其中计算storedPermits
的代码虽然容易理解,但是思路挺巧妙。一般来说,令牌桶算法的令牌需要以固定的速率进行添加,那么很自然想到可以起一个任务,按照一定的速度产生令牌,但是起一个新任务会占用一定的资源,从而加重系统的负担,此处的实现是根据利用时间差来计算这段时间产生的令牌数,以简单的计算完成了新任务需要做的事情,开销大大减少了。coolDownIntervalMicros
方法是抽象方法,在SmoothBursty
和SmoothWarmingUp
有不同的实现,在SmoothBursty
的实现是直接返回stableIntervalMicros
字段,这个字段目前还没设置过值,取默认值0.0,这里double的除零操作并不会抛异常,而是会返回无穷大。
我们接着看一下doSetRate
方法,这也是个抽象方法,在SmoothBursty
的实现如下:
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
maxPermits
在此之前并没有设置过值,因此默认是0.0,这里只是将storedPermits
初始化成了0。不过这里的代码也说明,在执行期间maxPermits
是可以在其他地方被修改的,如果出现了更改,就会等比例修改storedPermits
的值。
到这里SmoothBursty
的初始化过程就结束了,大体上是将内部的字段赋予了初始值。我们接下来看看SmoothBursty
的使用:
public double acquire() {
return acquire(1);
}
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
acquire
方法用于从令牌桶中获取令牌,参数permits
表示需要获取的令牌数量,如果当前没办法拿到需要的令牌,线程会阻塞一段时间,该方法返回等待的时间,reserve
的实现如下:
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
//返回等待时间,如果不需要等待,返回0
return max(momentAvailable - nowMicros, 0);
}
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
//取可用的令牌与需要的令牌两者的最小值
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
//计算该次请求超出的令牌数
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
//扣减令牌桶库存
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
reserve
的核心逻辑在reserveEarliestAvailable
方法中,该方法的主要思想是检查当前令牌桶内令牌数是否满足需求,如果满足则不需要额外的等待时间,否则需要将额外等待时间追加到nextFreeTicketMicros
,需要注意的是方法返回的不是更新过后的nextFreeTicketMicros
,而是上一次请求更新的时间,这个时间就是当前线程需要阻塞的时间,也就是说,当前请求所需要等待的时间是由下次请求完成的,下次请求需要的等待时间由下下次请求完成,以此类推。当前请求的令牌数超过令牌桶中的令牌数越多,下次请求需要等待的时间就越长。并且这里并没有对requiredPermits
的上限做检查,这就允许预支令牌,即假设桶的上限是100个令牌,一次请求可以允许超过100个令牌,只是生成多余令牌的时间需要算到下一个请求上。同时这里的逻辑也说明,获取令牌是直接成功的,只是获取完令牌后才需要一小段等待时间。
到这里SmoothBursty
的初始化以及获取令牌的所有逻辑就介绍完了,接下来看看另一个类SmoothWarmingUp
的源码。
static RateLimiter create(
double permitsPerSecond,
long warmupPeriod,
TimeUnit unit,
double coldFactor,
SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
我们之前介绍的另外领个构造器的底层调用的是这个包级的create
方法,该方法的5个参数中,只有coldFactor
是新出现的,字面意思是冷启动因子,源码写死了是3.0,该值表示指在warm up
阶段开始时,以多大的速率产生令牌,速率是稳定速率的三分之一,冷启动阶段结束后恢复到正常速率。setRate
方法底层会调用如下的doSetRate
方法:
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
//设置冷启动生成令牌的间隔是正常值的3倍(codeFactor固定为3)
double coldIntervalMicros = stableIntervalMicros * coldFactor;
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
//slope是梯形部分斜线的斜率
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}
doSetRate
的代码不容易理解,源码中利用图示介绍了几个变量之间的关系(但是本人仍然不是很理解,因此只能将结论放在这里,无法进行更多解释),如图所示,源码注释中说明了如下的两个等式:
- 梯形的面积等于预热时间
warmupPeriodMicros
warmupPeriodMicros = 0.5 * (coldIntervalMicros + stableIntervalMicros) * (maxPermits - thresholdPermits)
由此可以得到maxPermits
的值:
maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
- 左边矩形的面积是梯形面积的一半,由此可知:
warmupPeriodMicros * 0.5 = thresholdPermits * stableIntervalMicros
计算出thresholdPermits
的值为:
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros
SmoothWarmingUp
的初始化逻辑到这里就结束了,接下来介绍下它获取令牌的流程,acquire
方法的其他部分上文已经结束过,此处重点介绍storedPermitsToWaitTime
方法:
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
//存储的令牌数量超出thresholdPermits的部分,这部分反应在梯形区域
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
if (availablePermitsAboveThreshold > 0.0) {
//permitsAboveThresholdToTake表示梯形区域的高
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
//length计算的是梯形的上底+下底
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
//梯形区域的面积,即生产梯形区域的令牌数所需要的时间
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
//扣除掉需要消耗的梯形区域的令牌数,表示还需要从左侧矩形区域取得的令牌数量
permitsToTake -= permitsAboveThresholdToTake;
}
// measuring the integral on the left part of the function (the horizontal line)
//等待时间=梯形区域的时间+矩形区域的时间
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
}
//由前文可知,slope = =y/x = 产生令牌间隔/令牌数,permits * slope表示产生令牌间隔的增量,加上stableIntervalMicros表示梯形的底
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}
此处的storedPermitsToWaitTime
与SmoothBursty
中的实现大不相同,SmoothBursty
由于不需要预热,可以直接获取桶中的令牌,因此直接返回了0,而此处存在预热阶段,不能直接获取到令牌,因此计算逻辑稍微复杂些,总体来说,就是求图中阴影部分的面积。