跳至主要內容

限流设计

HeChuangJun约 2300 字大约 8 分钟

限流?为什么?和熔断、削峰区别?

限制系统在某一时间段内接受到的请求,避免流量多到超出负载使系统宕机

限流发生在流量进来前,对超过的流量进行限制。
熔断发生在流量进来后,系统发生故障时会自动切断请求,防止故障进一步扩展,导致服务雪崩
削峰是对流量的平滑处理,缓慢增加请求的处理速率来避免系统瞬时过载

削峰是水库,把流量储存起来,慢慢流,限流是闸口,拒绝超出的流量

限流实现步骤

限流设计.png
通过计数器、滑动窗口等方式统计请求流量(数量或速率)。单机版储存到本地。集群存储到Redis
根据设定的限制条件,判断当前请求流量是否超过限制
如果请求流量超过限制,执行限流策略,如拒绝请求、延迟处理、返回错误信息等
根据请求的处理结果,更新统计信息,如增加计数器的值、更新滑动窗口的数据等
重复执行以上步骤:不断地~

限流算法及实现

使用Redis作为分布式存储;Redission作为Redis客户端
//单例模式获取RedissonClient
public class RedissonConfig {
  private static final String REDIS_ADDRESS = "redis://127.0.0.1:6379";
  private static volatile  RedissonClient redissonClient;
  public static RedissonClient getInstance(){
    if (redissonClient==null){
      synchronized (RedissonConfig.class){
        if (redissonClient==null){
          Config config = new Config();
          config.useSingleServer().setAddress(REDIS_ADDRESS);
          redissonClient = Redisson.create(config);
          return redissonClient;
        }
      }
    }
    return redissonClient;
  }
}
固定窗口限流算法/计数器

将时间划分为固定长度的窗口(如每分钟),在每个时间窗口内,限制能够接受的最大请求数。
每个窗口期通过incrementAndGet操作统计请求的数量。窗口期结束利用键过期自动重置计数
实现简单,适用于流量比较稳定、请求数不太波动的场景。
不平滑和突发性流量:窗口切换的瞬间可能出现请求过多,造成系统的瞬时压力。

public class FixedWindowRateLimiter {
    public static final String KEY = "fixedWindowRateLimiter:";
    //请求限制数量
    private Long limit;
    //窗口大小(单位:S)
    private Long windowSize;

    public FixedWindowRateLimiter(Long limit, Long windowSize) {
        this.limit = limit;
        this.windowSize = windowSize;
    }

    //固定窗口限流
    public boolean triggerLimit(String path) {
        RedissonClient redissonClient = RedissonConfig.getInstance();
        //加分布式锁,防止并发情况下窗口初始化时间不一致问题
        RLock rLock = redissonClient.getLock(KEY + "LOCK:" + path);
        try {
            rLock.lock(100, TimeUnit.MILLISECONDS);
            String redisKey = KEY + path;
            RAtomicLong counter = redissonClient.getAtomicLong(redisKey);
            //计数
            long count = counter.incrementAndGet();
            //如果为1的话,就说明窗口刚初始化
            if (count == 1) {
                //直接设置过期时间,作为窗口
                counter.expire(windowSize, TimeUnit.SECONDS);
            }
            //触发限流
            if (count > limit) {
                //触发限流的不记在请求数量中
                counter.decrementAndGet();
                return true;
            }
            return false;
        } finally {
            rLock.unlock();
        }
    }
}

class FixedWindowRateLimiterTest {
    ThreadPoolExecutor threadPoolExecutor = 
    new ThreadPoolExecutor(20, 50, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));

    @Test
    @DisplayName("1min限制10次请求固定窗口测试")
    void triggerLimit() throws InterruptedException {
        FixedWindowRateLimiter fixedWindowRateLimiter = new FixedWindowRateLimiter(10L,60L);
        //模拟不同窗口内的调用
        for (int i = 0; i < 3; i++) {
            CountDownLatch countDownLatch = new CountDownLatch(20);
            //20个线程并发调用
            for (int j = 0; j < 20; j++) {
                threadPoolExecutor.execute(() -> {
                    boolean isLimit = fixedWindowRateLimiter.triggerLimit("/test");
                    System.out.println(isLimit);
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            TimeUnit.MINUTES.sleep(1);
        }
    }
}
滑动窗口算法

将一个大时间窗口划分为多个小时间窗口,每个小窗口都有独立的计数。判断请求的次数是否超过整个窗口的限制。窗口的移动是每次向前滑动一个小的单元窗口
例如将大时间窗口1min分成5个小窗口,每个小窗口的时间是12s。每个单元格有自己独立的计数器,每过12s就会向前移动一格。假如有请求在00:01的时候过来,这时窗口的计数3+12+9+15=39
解决了固定窗口的突发流量问题,适合于要求流量更加平滑、避免请求高峰集中在某个时间段的系统。
实现相对复杂
滑动窗口限流算法.png

使用Redis的zset结构。有请求过来时,把当前时间戳添加到zset。那么窗口之外的请求,根据窗口大小计算出起始时间戳,删除窗口外的请求。zset的大小就是窗口的请求数
zset在高并发情况下,时间戳可能会重复,导致统计的请求偏少,用时间戳+随机数或者生成唯一序列解决

public class SlidingWindowRateLimiter {
    public static final String KEY = "slidingWindowRateLimiter:";
    //请求次数限制
    private Long limit;
    //窗口大小S
    private Long windowSize;
    public SlidingWindowRateLimiter(Long limit, Long windowSize) {
        this.limit = limit;
        this.windowSize = windowSize;
    }
    public boolean triggerLimit(String path) {
        RedissonClient redissonClient = RedissonConfig.getInstance();
        //窗口计数
        RScoredSortedSet<Long> counter = redissonClient.getScoredSortedSet(KEY + path);
        //使用分布式锁,避免并发设置初始值的时候,导致窗口计数被覆盖
        RLock rLock = redissonClient.getLock(KEY + "LOCK:" + path);
        try {
            rLock.lock(200, TimeUnit.MILLISECONDS);
            // 当前时间戳
            long currentTimestamp = System.currentTimeMillis();
            // 窗口起始时间戳
            long windowStartTimestamp = currentTimestamp - windowSize * 1000;
            // 移除窗口外的时间戳,左闭右开
            counter.removeRangeByScore(0, true, windowStartTimestamp, false);
            // 将当前时间戳作为score,也作为member,
            // TODO:高并发情况下可能没法保证唯一,可以加一个唯一标识
            counter.add(currentTimestamp, currentTimestamp);
            //使用zset的元素个数,作为请求计数
            long count = counter.size();
            // 判断时间戳数量是否超过限流阈值
            if (count > limit) {
                System.out.println("[triggerLimit] path:" + path + " count:" 
                + count + " over limit:" + limit);
                return true;
            }
            return false;
        } finally {
            rLock.unlock();
        }
    }

}
class SlidingWindowRateLimiterTest {
    ThreadPoolExecutor threadPoolExecutor = 
    new ThreadPoolExecutor(30, 50, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));

    @Test
    @DisplayName("滑动窗口限流")
    void triggerLimit() throws InterruptedException {
        SlidingWindowRateLimiter slidingWindowRateLimiter = 
        new SlidingWindowRateLimiter(10L, 1L);
        //模拟在不同时间片内的请求
        for (int i = 0; i < 8; i++) {
            CountDownLatch countDownLatch = new CountDownLatch(20);
            for (int j = 0; j < 20; j++) {
                threadPoolExecutor.execute(() -> {
                    boolean isLimit = slidingWindowRateLimiter.triggerLimit("/test");
                    System.out.println(isLimit);
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            //休眠10s
            TimeUnit.SECONDS.sleep(10L);
        }
    }
}
漏桶算法(Leaky Bucket) 请求进入桶中,桶中的请求会按照固定的速率处理。 如果请求的到达速度超过桶的容量,则多余的请求将被丢弃。 漏桶算法限制了流量的最大输出速率

使用RScoredSortedSet,ZREMRANGEBYSCORE命令来删除旧请求
用zSet来存储path,定时任务处理所有path对应的桶的出水
用ScheduledExecutorService启动了定时任务,1s跑一次,
集群下用xxl-job去执行leakWater

稳定输出速率,不会受到突发流量的影响。
简单易实现
不能处理突发流量:可能会丢弃大量请求,不能像令牌桶那样灵活应对突发流量。

public class LeakyBucketRateLimiter {
    private RedissonClient redissonClient = RedissonConfig.getInstance();
    private static final String KEY_PREFIX = "LeakyBucket:";

    //桶的大小
    private Long bucketSize;
    //漏水速率,个/秒
    private Long leakRate;


    public LeakyBucketRateLimiter(Long bucketSize, Long leakRate) {
        this.bucketSize = bucketSize;
        this.leakRate = leakRate;
        //定时任务,每s执行一次
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(this::leakWater, 0, 1, TimeUnit.SECONDS);
    }

    //漏水
    public void leakWater() {
        RSet<String> pathSet=redissonClient.getSet(KEY_PREFIX+":pathSet");
        //遍历所有path,删除旧请求
        for(String path:pathSet){
            String redisKey = KEY_PREFIX + path;
            RScoredSortedSet<Long> bucket = 
            redissonClient.getScoredSortedSet(KEY_PREFIX + path);
            long now = System.currentTimeMillis();
            // 删除旧的请求
            bucket.removeRangeByScore(0, true,now - 1000 * leakRate,true);
        }
    }

    //限流
    public boolean triggerLimit(String path) {
        //加锁,防止并发初始化问题
        RLock rLock = redissonClient.getLock(KEY_PREFIX + "LOCK:" + path);
        try {
            rLock.lock(100,TimeUnit.MILLISECONDS);
            String redisKey = KEY_PREFIX + path;
            RScoredSortedSet<Long> bucket = redissonClient.getScoredSortedSet(redisKey);
            //这里用一个set,来存储所有path
            RSet<String> pathSet=redissonClient.getSet(KEY_PREFIX+":pathSet");
            pathSet.add(path);
            // 获取当前时间
            long now = System.currentTimeMillis();
            // 检查桶是否已满
            if (bucket.size() < bucketSize) {
                // 桶未满,添加一个元素到桶中
                bucket.add(now,now);
                return false;
            }
            // 桶已满,触发限流
            System.out.println("[triggerLimit] path:"+path+" bucket size:"+bucket.size());
            return true;
        }finally {
            rLock.unlock();
        }
    }
    
}

class LeakyBucketRateLimiterTest {

    ThreadPoolExecutor threadPoolExecutor = 
    new ThreadPoolExecutor(30, 50, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));

    @Test
    @DisplayName("漏桶算法")
    void triggerLimit() throws InterruptedException {
        LeakyBucketRateLimiter leakyBucketRateLimiter = new LeakyBucketRateLimiter(10L, 1L);
        for (int i = 0; i < 8; i++) {
            CountDownLatch countDownLatch = new CountDownLatch(20);
            for (int j = 0; j < 20; j++) {
                threadPoolExecutor.execute(() -> {
                    boolean isLimit = leakyBucketRateLimiter.triggerLimit("/test");
                    System.out.println(isLimit);
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            TimeUnit.SECONDS.sleep(10L);
        }
    }
}
令牌桶算法 使用一个桶来存储令牌,每个请求在到达时都需要获取一个令牌才能被处理。 以固定速率在桶中生成一定数量的令牌,当桶中的令牌数达到上限时,新的令牌会被丢弃。 如果桶中有令牌,请求可以直接处理;如果没有令牌,请求将被拒绝。 桶的容量决定了系统可以承受的最大突发请求量。

实现:开个线程定时往桶里投令牌,Redission提供令牌桶算法的实现

平滑流量:令牌桶算法允许在短时间内处理突发流量,只要桶中有足够的令牌,系统就可以接收突发流量。
灵活性高:令牌生成速率是固定的,突发流量可以由桶的容量来控制。

要维护一个令牌桶,增加了系统的复杂度。
如果突发流量超过了桶的容量,超出部分请求会被丢弃

Guava RateLimiter,基于令牌桶算法限流,单机
Sentinel ,基于滑动窗口限流,支持集群
网关限流,比如Spring Cloud Gateway、Nginx

public class TokenBucketRateLimiter {
    public static final String KEY = "TokenBucketRateLimiter:";

    //阈值
    private Long limit;
    //添加令牌的速率,单位:个/秒
    private Long tokenRate;

    public TokenBucketRateLimiter(Long limit, Long tokenRate) {
        this.limit = limit;
        this.tokenRate = tokenRate;
    }

    //限流
    public boolean triggerLimit(String path){
        RedissonClient redissonClient=RedissonConfig.getInstance();
        RRateLimiter rateLimiter = redissonClient.getRateLimiter(KEY+path);
        // 初始化,设置速率模式,速率,间隔,间隔单位
        rateLimiter.trySetRate(RateType.OVERALL, limit,
         tokenRate, RateIntervalUnit.SECONDS);
        // 获取令牌
        return rateLimiter.tryAcquire();
    }
}