令牌桶算法基本原理
令牌桶算法是一种常用的限流算法,其核心思想是系统以固定的速率向一个虚拟的 “桶” 中放入令牌,每个请求在处理前需要从桶中获取一个令牌,若桶中没有令牌,则请求被限流。
代码实现及解释
结构体定义
首先定义了 RateLimiter
结构体:
client
:是 Redis 客户端的指针,用于与 Redis 进行交互,实现分布式限流的核心在于利用 Redis 存储令牌桶的信息,这样多个实例可以共享这些信息。rate
:表示每秒允许的请求数,即令牌生成的速率。capacity
:令牌桶的容量,即桶中最多能存放的令牌数量。
创建限流器实例
// NewRateLimiter 创建一个新的分布式限流器
func NewRateLimiter(client *redis.RedisClient, rate, capacity int) *RateLimiter {
//log.Printf("初始化令牌桶")
limiter := &RateLimiter{
client: client,
rate: rate,
capacity: capacity,
}
// 初始化令牌桶信息
ctx := context.Background()
key := "global_rate_limit"
now := time.Now().Unix()
err := client.Client.HSet(ctx, key, "tokens", capacity).Err()
if err != nil {
log.Printf("初始化令牌桶信息失败: %v", err)
}
err = client.Client.HSet(ctx, key, "last_refill", now).Err()
if err != nil {
log.Printf("初始化令牌桶信息失败: %v", err)
}
return limiter
}
NewRateLimiter
函数用于创建一个新的 RateLimiter
实例,需要传入 Redis 客户端、令牌生成速率和令牌桶容量。
中间件函数
// RateLimitMiddleware 限流器中间件
func RateLimitMiddleware(limiter *RateLimiter, next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// 固定的限流key,表示全局令牌桶
key := "global_rate_limit"
// 调用 RateLimiter 的 Allow 方法判断是否允许请求通过
allowed, err := limiter.Allow(r.Context(), key)
if err != nil {
http.Error(w, "限流判断出错", http.StatusInternalServerError)
return
}
if !allowed {
http.Error(w, "请求过于频繁,请稍后再试", http.StatusTooManyRequests)
return
}
// 允许请求通过,继续处理
next(w, r)
}
}
RateLimitMiddleware
是一个中间件函数,用于在处理 HTTP 请求时进行限流。- 它首先从请求中获取客户端的 IP 地址,并生成一个唯一的限流 key。
- 然后调用
RateLimiter
的Allow
方法判断是否允许该请求通过。如果出现错误,返回内部服务器错误;如果不允许通过,返回请求过于频繁的错误;如果允许通过,则继续处理请求。
核心限流逻辑
// Allow 判断是否允许请求通过
func (l *RateLimiter) Allow(ctx context.Context, key string) (bool, error) {
if ctx.Err() != nil {
log.Printf("上下文已经取消或超时: %v", ctx.Err())
return false, ctx.Err()
}
// 获取当前时间戳
now := time.Now().Unix()
// 定义 Redis 事务
tx := l.client.Client.TxPipeline()
defer tx.Close()
// 从 Redis 中获取令牌桶的信息
//log.Printf("开始获取令牌桶信息,key: %s", key)
resp, err := l.client.Client.HGetAll(ctx, key).Result()
if err != nil {
if errors.Is(err, redis2.Nil) {
log.Printf("令牌桶信息不存在,初始化令牌桶")
// 令牌桶信息不存在,初始化令牌桶
tokens := l.capacity
lastRefill := now
// 保存初始化信息到 Redis
err := l.client.Client.HSet(ctx, key, "tokens", tokens, "last_refill", lastRefill).Err()
if err != nil {
log.Printf("保存令牌桶信息失败: %v", err)
return false, err
}
resp = map[string]string{
"tokens": strconv.Itoa(tokens),
"last_refill": strconv.FormatInt(lastRefill, 10),
}
} else {
log.Printf("获取令牌桶信息失败: %v", err)
return false, err
}
}
var tokens int
var lastRefill int64
// 解析令牌桶信息
tokens, _ = strconv.Atoi(resp["tokens"])
lastRefill, _ = strconv.ParseInt(resp["last_refill"], 10, 64)
// 计算需要补充的令牌数
elapsed := now - lastRefill
newTokens := int(elapsed) * l.rate
tokens = tokens + newTokens
if tokens > l.capacity {
tokens = l.capacity
}
// 判断是否有足够的令牌
if tokens < 1 {
return false, nil
}
// 消耗一个令牌
tokens--
// 开始事务操作
tx.HSet(ctx, key, "tokens", tokens)
tx.HSet(ctx, key, "last_refill", now)
// 执行 Redis 事务
_, err = tx.Exec(ctx)
if err != nil {
log.Printf("事务执行失败: %v", err)
return false, err
}
return true, nil
}
- 获取当前时间戳:通过
time.Now().Unix()
获取当前的 Unix 时间戳。 - 定义 Redis 事务:使用
TxPipeline
开启一个 Redis 事务,确保后续的多个 Redis 操作原子性执行。 - 获取令牌桶信息:通过
HGetAll
从 Redis 中获取令牌桶的信息,包括令牌数量tokens
和上次填充时间lastRefill
。 - 初始化令牌桶:如果令牌桶信息不存在,将令牌数量初始化为令牌桶容量,上次填充时间初始化为当前时间。
- 计算补充令牌数:根据当前时间和上次填充时间的差值,计算需要补充的令牌数,并更新令牌数量。如果超过令牌桶容量,则将令牌数量设置为容量值。
- 判断令牌是否足够:如果令牌数量小于 1,说明没有可用令牌,返回
false
表示不允许请求通过。 - 消耗令牌:如果有可用令牌,将令牌数量减 1。
- 更新令牌桶信息:使用
HSet
更新 Redis 中令牌桶的信息,包括更新后的令牌数量和上次填充时间。 - 执行 Redis 事务:调用
Exec
执行 Redis 事务,如果出现错误则返回false
。
分布式限流的实现
通过使用 Redis 存储令牌桶的信息,多个实例可以共享这些信息,从而实现分布式限流。每个实例在处理请求时,都从 Redis 中获取和更新令牌桶的信息,保证了限流的一致性。
优点和局限性
- 优点
- 可以有效控制请求的速率,防止系统被过多请求压垮。
- 分布式实现可以在多个实例之间共享限流信息,适用于分布式系统。
- 局限性
- 引入了 Redis 作为外部依赖,增加了系统的复杂性。
- 由于需要与 Redis 进行交互,会有一定的性能开销。