Skip to main content

Distributed Rate Limiter 限流器

1. Single Node 单机限流器

1.1 Guava RateLimiter

让我们考虑一个常见的场景:一个 RESTful API 服务器,它提供了一个查询数据的端点。但是,为了防止滥用,我们希望限制每秒的请求数量。

  • rateLimiter.acquire(): Tries to get a token. If none are available, it will wait and block the program. 如果没有可用的令牌,它会阻塞直到有令牌可用或者超时。
  • rateLimiter.tryAcquire(): Tries to get a token. If none are available, it returns false right away. 如果没有可用的令牌,它会立即返回false而不是阻塞。
  • RateLimiter.create(5.0): It will add 5 tokens every seacond. 每秒产生5个令牌。
import com.google.common.util.concurrent.RateLimiter;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ApiController {

// 创建一个每秒产生5个令牌的RateLimiter
private static final RateLimiter rateLimiter = RateLimiter.create(5.0);

@GetMapping("/data")
public String fetchData() {
if (!rateLimiter.tryAcquire()) {
return "Too many requests. Please try again later.";
}
// 正常处理逻辑
return "Data fetched successfully!";
}
}

1.2 实现 Token Bucket Limiter

import java.util.concurrent.ConcurrentHashMap;

/**
* Notification Limit 设计类
* - 设计一个系统,每用户每分钟只能收到不超过 N 条通知
* - 支持 API:canSend(userId: String, timestamp: Long): boolean
*/

/**
* 通用令牌桶限流器(Token Bucket Rate Limiter)。
*
* 用于控制任意 key(如用户 ID、IP 地址、设备号)在固定时间窗口内的最大请求频率。
* 支持突发请求,适用于 API 调用控制、消息发送频控等场景。
*
* 示例用途:
* - 每用户每分钟最多发送 N 条通知
* - 每个 item 每秒最多展示 M 次
*
* 核心算法:令牌桶(Token Bucket)
**/
public class TokenBucketRateLimiter {

// 每个桶最多可容纳的令牌数量(令牌桶的容量
private final long maxTokens;

// 每毫秒生成的令牌数量
private final double refillRate;

// 每个 key(如 userId)对应一个令牌桶
private final ConcurrentHashMap<String, Bucket> buckets = new ConcurrentHashMap<>();

/**
* 构造限流器
*
* @param maxTokens 每个桶的最大容量(每个 key 最多可执行多少次)
* @param refillIntervalMillis 补充周期,例如 60_000 表示每分钟补满 maxTokens 个令牌
*/
public TokenBucketRateLimiter(long maxTokens, long refillIntervalMillis) {
this.maxTokens = maxTokens;
this.refillRate = (double) maxTokens / refillIntervalMillis;
}

/**
* 判断当前 key(例如 userId)是否允许执行操作
*
* @param key 被限流的标识,例如 userId、itemId
* @param timestampMillis 当前时间戳(毫秒)
* @return true 表示允许执行,false 表示已达限流
*/
public boolean allow(String key, long timestampMillis) {
// 获取桶,如果不存在则创建新桶,初始令牌为满
Bucket bucket = buckets.get(key);
if (bucket == null) {
bucket = new Bucket(maxTokens, timestampMillis);
buckets.put(key, bucket);
}

// 保证每个桶的令牌操作是线程安全的
synchronized (bucket) {
// 计算距离上次补充的时间差
long fromLastRefilled = timestampMillis - bucket.lastRefillTimestamp;

// 计算应补充的令牌数量
long tokensToAdd = (long) (fromLastRefilled * refillRate);

// 如果可以补充,更新令牌数和时间戳
if (tokensToAdd > 0) {
bucket.tokens = Math.min(bucket.tokens + tokensToAdd, maxTokens);
bucket.lastRefillTimestamp = timestampMillis;
}

// 如果桶中还有令牌,允许执行,并消耗一个
if (bucket.tokens > 0) {
bucket.tokens--;
return true;
} else {
// 否则限流
return false;
}
}
}

/**
* 内部类:桶对象,表示每个 key 的限流状态
*/
private static class Bucket {
long tokens; // 当前令牌数量
long lastRefillTimestamp; // 上次补充令牌的时间

Bucket(long tokens, long timestamp) {
this.tokens = tokens;
this.lastRefillTimestamp = timestamp;
}
}

public static void main(String[] args) throws InterruptedException {
// 每秒最多 5 个请求
TokenBucketRateLimiter limiter = new TokenBucketRateLimiter(5, 1000);

String userId = "user-abc";
for (int i = 0; i < 10; i++) {
boolean allowed = limiter.allow(userId, System.currentTimeMillis());
System.out.println("Request " + i + " allowed: " + allowed);
Thread.sleep(100); // 模拟请求间隔
}
}
}

2. Tools needed 准备工作

2.1 Apache Bench

ab is a tool for benchmarking your Apache Hypertext Transfer Protocol (HTTP) server. It is designed to give you an impression of how your current Apache installation performs.

ab 是一个用于对 Apache 超文本传输协议 (HTTP) 服务器进行基准测试的工具

How to use

# -n 表示请求数,-c 表示并发数
ab -n 100 -c 10 http://www.imooc.com/

-n  即requests,用于指定压力测试总共的执行次数。
-c 即concurrency,用于指定的并发数。
-t 即timelimit,等待响应的最大时间(单位:秒)。
-b 即windowsize,TCP发送/接收的缓冲大小(单位:字节)。
-p 即postfile,发送POST请求时需要上传的文件,此外还必须设置-T参数。
-u 即putfile,发送PUT请求时需要上传的文件,此外还必须设置-T参数。
-T 即content-type,用于设置Content-Type请求头信息,例如:application/x-www-form-urlencoded,默认值为text/plain。
-v 即verbosity,指定打印帮助信息的冗余级别。
-w 以HTML表格形式打印结果。
-i 使用HEAD请求代替GET请求。
-x 插入字符串作为table标签的属性。
-y 插入字符串作为tr标签的属性。
-z 插入字符串作为td标签的属性。
-C 添加cookie信息,例如:"Apache=1234"(可以重复该参数选项以添加多个)。
-H 添加任意的请求头,例如:"Accept-Encoding: gzip",请求头将会添加在现有的多个请求头之后(可以重复该参数选项以添加多个)。
-A 添加一个基本的网络认证信息,用户名和密码之间用英文冒号隔开。
-P 添加一个基本的代理认证信息,用户名和密码之间用英文冒号隔开。
-X 指定使用的和端口号,例如:"126.10.10.3:88"。
-V 打印版本号并退出。
-k 使用HTTP的KeepAlive特性。
-d 不显示百分比。
-S 不显示预估和警告信息。
-g 输出结果信息到gnuplot格式的文件中。
-e 输出结果信息到CSV格式的文件中。
-r 指定接收到错误信息时不退出程序。
-h 显示用法信息,其实就是ab -help。

3. Distributed 分布式 - Alibaba Sentinel

GitHub: https://github.com/alibaba/Sentinel

As distributed systems are becoming increasingly popular, the reliability between services is becoming more important than ever before. Sentinel is a powerful flow control component that takes "flow" as the breakthrough point and covers multiple fields including flow control, concurrency limiting, circuit breaking, and adaptive system protection to guarantee the reliability of microservices.

随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。

3.1 Sentinel Dashboard 控制台

  • 从 release 页面下载控制台的 jar 包
  • 或者下载源码,使用 maven 打包 mvn clean package -DskipTests

Run the Dashboard 启动

# 官方版本命令 not work
java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar

# 指定允许 Java 版本后, it works
java -Dserver.port=9000 -Dcsp.sentinel.dashboard.server=localhost:9000 -Dproject.name=sentinel-dashboard --add-opens java.base/java.lang=ALL-UNNAMED -jar sentinel-dashboard-1.8.0.jar

启动后,访问 http://localhost:9000,默认用户名密码都是 sentinel.

References