LOADING

加载过慢请开启缓存 浏览器默认开启

分布式算法(令牌桶)

2025/2/21 算法

令牌桶算法基本原理

  • 令牌桶算法是一种常用的限流算法,其核心思想是系统以固定的速率向一个虚拟的 “桶” 中放入令牌,每个请求在处理前需要从桶中获取一个令牌,若桶中没有令牌,则请求被限流。

代码实现及解释

结构体定义

首先定义了 RateLimiter 结构体:

  • client:是 Redis 客户端的指针,用于与 Redis 进行交互,实现分布式限流的核心在于利用 Redis 存储令牌桶的信息,这样多个实例可以共享这些信息。
  • rate:表示每秒允许的请求数,即令牌生成的速率。
  • capacity:令牌桶的容量,即桶中最多能存放的令牌数量。

创建限流器实例

// NewRateLimiter 创建一个新的分布式限流器
func NewRateLimiter(client *redis.RedisClient, rate, capacity int) *RateLimiter {
    //log.Printf("初始化令牌桶")
    limiter := &RateLimiter{
        client:   client,
        rate:     rate,
        capacity: capacity,
    }
    // 初始化令牌桶信息
    ctx := context.Background()
    key := "global_rate_limit"
    now := time.Now().Unix()
    err := client.Client.HSet(ctx, key, "tokens", capacity).Err()
    if err != nil {
        log.Printf("初始化令牌桶信息失败: %v", err)
    }
    err = client.Client.HSet(ctx, key, "last_refill", now).Err()
    if err != nil {
        log.Printf("初始化令牌桶信息失败: %v", err)
    }

    return limiter
}

NewRateLimiter 函数用于创建一个新的 RateLimiter 实例,需要传入 Redis 客户端、令牌生成速率和令牌桶容量。

中间件函数

// RateLimitMiddleware 限流器中间件
func RateLimitMiddleware(limiter *RateLimiter, next http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        // 固定的限流key,表示全局令牌桶
        key := "global_rate_limit"

        // 调用 RateLimiter 的 Allow 方法判断是否允许请求通过
        allowed, err := limiter.Allow(r.Context(), key)
        if err != nil {
            http.Error(w, "限流判断出错", http.StatusInternalServerError)
            return
        }
        if !allowed {
            http.Error(w, "请求过于频繁,请稍后再试", http.StatusTooManyRequests)
            return
        }

        // 允许请求通过,继续处理
        next(w, r)
    }
}
  • RateLimitMiddleware 是一个中间件函数,用于在处理 HTTP 请求时进行限流。
  • 它首先从请求中获取客户端的 IP 地址,并生成一个唯一的限流 key。
  • 然后调用 RateLimiterAllow 方法判断是否允许该请求通过。如果出现错误,返回内部服务器错误;如果不允许通过,返回请求过于频繁的错误;如果允许通过,则继续处理请求。

核心限流逻辑

// Allow 判断是否允许请求通过
func (l *RateLimiter) Allow(ctx context.Context, key string) (bool, error) {
    if ctx.Err() != nil {
        log.Printf("上下文已经取消或超时: %v", ctx.Err())
        return false, ctx.Err()
    }
    // 获取当前时间戳
    now := time.Now().Unix()

    // 定义 Redis 事务
    tx := l.client.Client.TxPipeline()
    defer tx.Close()

    // 从 Redis 中获取令牌桶的信息
    //log.Printf("开始获取令牌桶信息,key: %s", key)
    resp, err := l.client.Client.HGetAll(ctx, key).Result()
    if err != nil {
        if errors.Is(err, redis2.Nil) {
            log.Printf("令牌桶信息不存在,初始化令牌桶")
            // 令牌桶信息不存在,初始化令牌桶
            tokens := l.capacity
            lastRefill := now
            // 保存初始化信息到 Redis
            err := l.client.Client.HSet(ctx, key, "tokens", tokens, "last_refill", lastRefill).Err()
            if err != nil {
                log.Printf("保存令牌桶信息失败: %v", err)
                return false, err
            }
            resp = map[string]string{
                "tokens":      strconv.Itoa(tokens),
                "last_refill": strconv.FormatInt(lastRefill, 10),
            }
        } else {
            log.Printf("获取令牌桶信息失败: %v", err)
            return false, err
        }
    }
    var tokens int
    var lastRefill int64

    // 解析令牌桶信息
    tokens, _ = strconv.Atoi(resp["tokens"])
    lastRefill, _ = strconv.ParseInt(resp["last_refill"], 10, 64)

    // 计算需要补充的令牌数
    elapsed := now - lastRefill
    newTokens := int(elapsed) * l.rate
    tokens = tokens + newTokens
    if tokens > l.capacity {
        tokens = l.capacity
    }

    // 判断是否有足够的令牌
    if tokens < 1 {
        return false, nil
    }

    // 消耗一个令牌
    tokens--

    // 开始事务操作
    tx.HSet(ctx, key, "tokens", tokens)
    tx.HSet(ctx, key, "last_refill", now)

    // 执行 Redis 事务
    _, err = tx.Exec(ctx)
    if err != nil {
        log.Printf("事务执行失败: %v", err)
        return false, err
    }

    return true, nil
}
  • 获取当前时间戳:通过 time.Now().Unix() 获取当前的 Unix 时间戳。
  • 定义 Redis 事务:使用 TxPipeline 开启一个 Redis 事务,确保后续的多个 Redis 操作原子性执行。
  • 获取令牌桶信息:通过 HGetAll 从 Redis 中获取令牌桶的信息,包括令牌数量 tokens 和上次填充时间 lastRefill
  • 初始化令牌桶:如果令牌桶信息不存在,将令牌数量初始化为令牌桶容量,上次填充时间初始化为当前时间。
  • 计算补充令牌数:根据当前时间和上次填充时间的差值,计算需要补充的令牌数,并更新令牌数量。如果超过令牌桶容量,则将令牌数量设置为容量值。
  • 判断令牌是否足够:如果令牌数量小于 1,说明没有可用令牌,返回 false 表示不允许请求通过。
  • 消耗令牌:如果有可用令牌,将令牌数量减 1。
  • 更新令牌桶信息:使用 HSet 更新 Redis 中令牌桶的信息,包括更新后的令牌数量和上次填充时间。
  • 执行 Redis 事务:调用 Exec 执行 Redis 事务,如果出现错误则返回 false

分布式限流的实现

通过使用 Redis 存储令牌桶的信息,多个实例可以共享这些信息,从而实现分布式限流。每个实例在处理请求时,都从 Redis 中获取和更新令牌桶的信息,保证了限流的一致性。

优点和局限性

  1. 优点
  • 可以有效控制请求的速率,防止系统被过多请求压垮。
  • 分布式实现可以在多个实例之间共享限流信息,适用于分布式系统。
  1. 局限性
  • 引入了 Redis 作为外部依赖,增加了系统的复杂性。
  • 由于需要与 Redis 进行交互,会有一定的性能开销。