限流设计
限流?为什么?和熔断、削峰区别?
限制系统在某一时间段内接受到的请求,避免流量多到超出负载使系统宕机
限流发生在流量进来前,对超过的流量进行限制。
熔断发生在流量进来后,系统发生故障时会自动切断请求,防止故障进一步扩展,导致服务雪崩
削峰是对流量的平滑处理,缓慢增加请求的处理速率来避免系统瞬时过载
削峰是水库,把流量储存起来,慢慢流,限流是闸口,拒绝超出的流量
限流实现步骤
通过计数器、滑动窗口等方式统计请求流量(数量或速率)。单机版储存到本地。集群存储到Redis
根据设定的限制条件,判断当前请求流量是否超过限制
如果请求流量超过限制,执行限流策略,如拒绝请求、延迟处理、返回错误信息等
根据请求的处理结果,更新统计信息,如增加计数器的值、更新滑动窗口的数据等
重复执行以上步骤:不断地~
限流算法及实现
使用Redis作为分布式存储;Redission作为Redis客户端
//单例模式获取RedissonClient
public class RedissonConfig {
private static final String REDIS_ADDRESS = "redis://127.0.0.1:6379";
private static volatile RedissonClient redissonClient;
public static RedissonClient getInstance(){
if (redissonClient==null){
synchronized (RedissonConfig.class){
if (redissonClient==null){
Config config = new Config();
config.useSingleServer().setAddress(REDIS_ADDRESS);
redissonClient = Redisson.create(config);
return redissonClient;
}
}
}
return redissonClient;
}
}
固定窗口限流算法/计数器
将时间划分为固定长度的窗口(如每分钟),在每个时间窗口内,限制能够接受的最大请求数。
每个窗口期通过incrementAndGet操作统计请求的数量。窗口期结束利用键过期自动重置计数
实现简单,适用于流量比较稳定、请求数不太波动的场景。
不平滑和突发性流量:窗口切换的瞬间可能出现请求过多,造成系统的瞬时压力。
public class FixedWindowRateLimiter {
public static final String KEY = "fixedWindowRateLimiter:";
//请求限制数量
private Long limit;
//窗口大小(单位:S)
private Long windowSize;
public FixedWindowRateLimiter(Long limit, Long windowSize) {
this.limit = limit;
this.windowSize = windowSize;
}
//固定窗口限流
public boolean triggerLimit(String path) {
RedissonClient redissonClient = RedissonConfig.getInstance();
//加分布式锁,防止并发情况下窗口初始化时间不一致问题
RLock rLock = redissonClient.getLock(KEY + "LOCK:" + path);
try {
rLock.lock(100, TimeUnit.MILLISECONDS);
String redisKey = KEY + path;
RAtomicLong counter = redissonClient.getAtomicLong(redisKey);
//计数
long count = counter.incrementAndGet();
//如果为1的话,就说明窗口刚初始化
if (count == 1) {
//直接设置过期时间,作为窗口
counter.expire(windowSize, TimeUnit.SECONDS);
}
//触发限流
if (count > limit) {
//触发限流的不记在请求数量中
counter.decrementAndGet();
return true;
}
return false;
} finally {
rLock.unlock();
}
}
}
class FixedWindowRateLimiterTest {
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(20, 50, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));
@Test
@DisplayName("1min限制10次请求固定窗口测试")
void triggerLimit() throws InterruptedException {
FixedWindowRateLimiter fixedWindowRateLimiter = new FixedWindowRateLimiter(10L,60L);
//模拟不同窗口内的调用
for (int i = 0; i < 3; i++) {
CountDownLatch countDownLatch = new CountDownLatch(20);
//20个线程并发调用
for (int j = 0; j < 20; j++) {
threadPoolExecutor.execute(() -> {
boolean isLimit = fixedWindowRateLimiter.triggerLimit("/test");
System.out.println(isLimit);
countDownLatch.countDown();
});
}
countDownLatch.await();
TimeUnit.MINUTES.sleep(1);
}
}
}
滑动窗口算法
将一个大时间窗口划分为多个小时间窗口,每个小窗口都有独立的计数。判断请求的次数是否超过整个窗口的限制。窗口的移动是每次向前滑动一个小的单元窗口
例如将大时间窗口1min分成5个小窗口,每个小窗口的时间是12s。每个单元格有自己独立的计数器,每过12s就会向前移动一格。假如有请求在00:01的时候过来,这时窗口的计数3+12+9+15=39
解决了固定窗口的突发流量问题,适合于要求流量更加平滑、避免请求高峰集中在某个时间段的系统。
实现相对复杂
使用Redis的zset结构。有请求过来时,把当前时间戳添加到zset。那么窗口之外的请求,根据窗口大小计算出起始时间戳,删除窗口外的请求。zset的大小就是窗口的请求数
zset在高并发情况下,时间戳可能会重复,导致统计的请求偏少,用时间戳+随机数或者生成唯一序列解决
public class SlidingWindowRateLimiter {
public static final String KEY = "slidingWindowRateLimiter:";
//请求次数限制
private Long limit;
//窗口大小S
private Long windowSize;
public SlidingWindowRateLimiter(Long limit, Long windowSize) {
this.limit = limit;
this.windowSize = windowSize;
}
public boolean triggerLimit(String path) {
RedissonClient redissonClient = RedissonConfig.getInstance();
//窗口计数
RScoredSortedSet<Long> counter = redissonClient.getScoredSortedSet(KEY + path);
//使用分布式锁,避免并发设置初始值的时候,导致窗口计数被覆盖
RLock rLock = redissonClient.getLock(KEY + "LOCK:" + path);
try {
rLock.lock(200, TimeUnit.MILLISECONDS);
// 当前时间戳
long currentTimestamp = System.currentTimeMillis();
// 窗口起始时间戳
long windowStartTimestamp = currentTimestamp - windowSize * 1000;
// 移除窗口外的时间戳,左闭右开
counter.removeRangeByScore(0, true, windowStartTimestamp, false);
// 将当前时间戳作为score,也作为member,
// TODO:高并发情况下可能没法保证唯一,可以加一个唯一标识
counter.add(currentTimestamp, currentTimestamp);
//使用zset的元素个数,作为请求计数
long count = counter.size();
// 判断时间戳数量是否超过限流阈值
if (count > limit) {
System.out.println("[triggerLimit] path:" + path + " count:"
+ count + " over limit:" + limit);
return true;
}
return false;
} finally {
rLock.unlock();
}
}
}
class SlidingWindowRateLimiterTest {
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(30, 50, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));
@Test
@DisplayName("滑动窗口限流")
void triggerLimit() throws InterruptedException {
SlidingWindowRateLimiter slidingWindowRateLimiter =
new SlidingWindowRateLimiter(10L, 1L);
//模拟在不同时间片内的请求
for (int i = 0; i < 8; i++) {
CountDownLatch countDownLatch = new CountDownLatch(20);
for (int j = 0; j < 20; j++) {
threadPoolExecutor.execute(() -> {
boolean isLimit = slidingWindowRateLimiter.triggerLimit("/test");
System.out.println(isLimit);
countDownLatch.countDown();
});
}
countDownLatch.await();
//休眠10s
TimeUnit.SECONDS.sleep(10L);
}
}
}
漏桶算法(Leaky Bucket)
请求进入桶中,桶中的请求会按照固定的速率处理。 如果请求的到达速度超过桶的容量,则多余的请求将被丢弃。 漏桶算法限制了流量的最大输出速率使用RScoredSortedSet,ZREMRANGEBYSCORE命令来删除旧请求
用zSet来存储path,定时任务处理所有path对应的桶的出水
用ScheduledExecutorService启动了定时任务,1s跑一次,
集群下用xxl-job去执行leakWater
稳定输出速率,不会受到突发流量的影响。
简单易实现
不能处理突发流量:可能会丢弃大量请求,不能像令牌桶那样灵活应对突发流量。
public class LeakyBucketRateLimiter {
private RedissonClient redissonClient = RedissonConfig.getInstance();
private static final String KEY_PREFIX = "LeakyBucket:";
//桶的大小
private Long bucketSize;
//漏水速率,个/秒
private Long leakRate;
public LeakyBucketRateLimiter(Long bucketSize, Long leakRate) {
this.bucketSize = bucketSize;
this.leakRate = leakRate;
//定时任务,每s执行一次
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(this::leakWater, 0, 1, TimeUnit.SECONDS);
}
//漏水
public void leakWater() {
RSet<String> pathSet=redissonClient.getSet(KEY_PREFIX+":pathSet");
//遍历所有path,删除旧请求
for(String path:pathSet){
String redisKey = KEY_PREFIX + path;
RScoredSortedSet<Long> bucket =
redissonClient.getScoredSortedSet(KEY_PREFIX + path);
long now = System.currentTimeMillis();
// 删除旧的请求
bucket.removeRangeByScore(0, true,now - 1000 * leakRate,true);
}
}
//限流
public boolean triggerLimit(String path) {
//加锁,防止并发初始化问题
RLock rLock = redissonClient.getLock(KEY_PREFIX + "LOCK:" + path);
try {
rLock.lock(100,TimeUnit.MILLISECONDS);
String redisKey = KEY_PREFIX + path;
RScoredSortedSet<Long> bucket = redissonClient.getScoredSortedSet(redisKey);
//这里用一个set,来存储所有path
RSet<String> pathSet=redissonClient.getSet(KEY_PREFIX+":pathSet");
pathSet.add(path);
// 获取当前时间
long now = System.currentTimeMillis();
// 检查桶是否已满
if (bucket.size() < bucketSize) {
// 桶未满,添加一个元素到桶中
bucket.add(now,now);
return false;
}
// 桶已满,触发限流
System.out.println("[triggerLimit] path:"+path+" bucket size:"+bucket.size());
return true;
}finally {
rLock.unlock();
}
}
}
class LeakyBucketRateLimiterTest {
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(30, 50, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));
@Test
@DisplayName("漏桶算法")
void triggerLimit() throws InterruptedException {
LeakyBucketRateLimiter leakyBucketRateLimiter = new LeakyBucketRateLimiter(10L, 1L);
for (int i = 0; i < 8; i++) {
CountDownLatch countDownLatch = new CountDownLatch(20);
for (int j = 0; j < 20; j++) {
threadPoolExecutor.execute(() -> {
boolean isLimit = leakyBucketRateLimiter.triggerLimit("/test");
System.out.println(isLimit);
countDownLatch.countDown();
});
}
countDownLatch.await();
TimeUnit.SECONDS.sleep(10L);
}
}
}
令牌桶算法
使用一个桶来存储令牌,每个请求在到达时都需要获取一个令牌才能被处理。 以固定速率在桶中生成一定数量的令牌,当桶中的令牌数达到上限时,新的令牌会被丢弃。 如果桶中有令牌,请求可以直接处理;如果没有令牌,请求将被拒绝。 桶的容量决定了系统可以承受的最大突发请求量。实现:开个线程定时往桶里投令牌,Redission提供令牌桶算法的实现
平滑流量:令牌桶算法允许在短时间内处理突发流量,只要桶中有足够的令牌,系统就可以接收突发流量。
灵活性高:令牌生成速率是固定的,突发流量可以由桶的容量来控制。
要维护一个令牌桶,增加了系统的复杂度。
如果突发流量超过了桶的容量,超出部分请求会被丢弃
Guava RateLimiter,基于令牌桶算法限流,单机
Sentinel ,基于滑动窗口限流,支持集群
网关限流,比如Spring Cloud Gateway、Nginx
public class TokenBucketRateLimiter {
public static final String KEY = "TokenBucketRateLimiter:";
//阈值
private Long limit;
//添加令牌的速率,单位:个/秒
private Long tokenRate;
public TokenBucketRateLimiter(Long limit, Long tokenRate) {
this.limit = limit;
this.tokenRate = tokenRate;
}
//限流
public boolean triggerLimit(String path){
RedissonClient redissonClient=RedissonConfig.getInstance();
RRateLimiter rateLimiter = redissonClient.getRateLimiter(KEY+path);
// 初始化,设置速率模式,速率,间隔,间隔单位
rateLimiter.trySetRate(RateType.OVERALL, limit,
tokenRate, RateIntervalUnit.SECONDS);
// 获取令牌
return rateLimiter.tryAcquire();
}
}