关于 API 限流:令牌桶到底怎么写才稳?

查看 21|回复 2
作者:matters   
最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验:
1. 为什么弃用定时器?
早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。
但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。
——优化点: 改用延迟计算。
不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。
2. 重视“权重( Weight )”
有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。

  • 下单:5 weight

  • 查深度:2 weight

  • 批量撤单:10 weight

    所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。
    3. 处理网络抖动( Jitter )带来的假限流
    理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。
    ——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。
    4. Python 示例
    以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算:
    import time
    import threading
    class AllTickLimiter:
        def __init__(self, capacity: float, rate: float):
            """
            :param capacity: 桶容量(最大允许的突发请求权重)
            :param rate: 令牌恢复速率(每秒恢复的权重数)
            """
            self.capacity = float(capacity)
            self.rate = float(rate)
            self.tokens = float(capacity)
            self.last_tick = time.monotonic()
            self._lock = threading.Lock()
        def allow(self, weight: float = 1.0) -> bool:
            """
            检查当前令牌是否足够支付本次请求的权重
            """
            with self._lock:
                now = time.monotonic()
                # 1. 延迟计算:计算自上次请求以来生成的令牌
                delta = (now - self.last_tick) * self.rate
                self.tokens = min(self.capacity, self.tokens + delta)
                self.last_tick = now
                # 2. 尝试消费
                if self.tokens >= weight:
                    self.tokens -= weight
                    return True
                return False
        def sync_from_header(self, server_remaining: float):
            """
            利用响应头中的权威剩余量进行校准
            防止本地计算与服务端由于网络延迟导致的偏差
            """
            with self._lock:
                # 强制同步服务端返回的剩余额度
                self.tokens = min(self.capacity, server_remaining)
                self.last_tick = time.monotonic()
    # --- 实战调用示例 —
    # 假设你的 API 套餐是每秒 10 个 Token
    limiter = AllTickLimiter(capacity=20, rate=10)
    def get_market_data(symbol: str):
        # 假设查询实时报价权重为 1
        weight = 1.0
       
        if limiter.allow(weight):
            # 模拟 AllTick API 请求
            # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}")
            # data = response.json()
            
            print(f"[{symbol}] 请求成功")
            
            # 进阶操作:从 Header 获取服务端权威数据进行同步
            # remaining = float(response.headers.get("X-RateLimit-Remaining", 20))
            # limiter.sync_from_header(remaining)
        else:
            print(f"[{symbol}] 触发本地限流,请求被拦截")
    # 模拟快速并发请求
    for i in range(15):
        get_market_data("BTCUSDT")
    5. 分布式下的抉择
    如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。

    限流, 令牌桶, 权重

  • lp7631010   
    ai 给你刷刷刷写出来了
    aababc   
    好像之前见过一个 GCRA 的算法可以参考一下
    您需要登录后才可以回帖 登录 | 立即注册

    返回顶部