四畳半神话大系

系统限流相关操作

限流的四种策略

固定窗口

在固定时间段内(1min 或 30s 或 10s) 限制一定量的请求次数,达到阈值后触发相关干预,直到下个时间段,重置请求量,流量恢复。

1
2
3
4
5
6
7
// 全局变量
int totalCount = 0;
if(totalCount > 限流阈值){
return ; // 不再继续处理
}
totalCount++;
// ....

固定窗口需要注意并发问题, 设置的限流阈值的周期应该尽可能的短,相应的限流阈值上限应该也要减小。甚至可以用 平均并发数 * 固定时间
缺点是,固定窗口,要么是提前达到阈值上限,剩余时间周期的请求都被限制;要么达不到上限,资源无法充分利用

滑动窗口

将固定窗口粒度切分更细,如 1 分钟 60 个滑动窗口,随时间推移滑动窗口同步后移。

1
2
3
4
5
6
7
8
9
10
全局链表 counterList = new 链表[切分的滑动窗口数]
// 设定一个定时器,每一次统计时间段的起点需要变化时候,就将索引0位置的元素移除,并在末端追加一个新元素.
// 比如 当前时间描述 % 切分的滑动窗口数 获得当前索引下标,在此下标之前的数组下标移除
int sum = counterList.Sum();
if (sum > 限流阈值){
return; // 不继续请求
}
int 当前索引 = 当前时间秒数 % 切分的滑动窗口数
counterList[当前索引] ++ ;
// do something

漏桶

漏桶模式的核心是固定 出口 的速率, 不管进来多少量,出去的速率都是一定的。如果涌入过多的量,多到桶都装不下,就进行流量干预。
过程:

  1. 控制流出速率,可以用窗口模式实现,如果当前速率小于阈值则直接处理请求,否则不直接处理请求,进入缓冲区,并增加水位。
  2. 缓冲的实现可以用短暂休眠 或 记录到一个容器内再做异步重试
  3. 当桶水位超过最大水位,不处理请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
全局变量 int unitSpeed;  // 当前出口的流出速率,每隔一个速率计算周期 (1s) 重置该值
全局变量 int waterLevel; // 当前缓冲区水位
if (unitSeep < 速率阈值){
unitSpeed ++;
// do something ...
}else{
if (waterLevel > 水位阈值){
return; // 不再处理请求
}
waterLevel ++ ;
while(unitSpeed >= 速率阈值) {
sleep(一小段时间);
}
unitSpeed ++ ;
waterLevel --;
// do something....
}

令牌桶

令牌桶模式的核心是固定的进口速率。先拿到令牌,在处理请求。拿不到令牌就进行流量干预。因此大流量进入时, 生产令牌的速度大于等于被处理的速度, 此刻就是程序处理能力的极限。

1
2
3
4
5
6
全局变量 int tokenCount = 令牌数阈值;  // 可用令牌数, 有一个独立线程使用固定评率增加这个数值, 但不大于 令牌阈值
if (tokenCount == 0) {
return;
}
tokenCount -- ;
// do something ...

Celery 中用python实现了一个令牌桶类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
"""Token bucket implementation for rate limiting."""
from __future__ import absolute_import, unicode_literals
from collections import deque
from kombu.five import monotonic

__all__ = ['TokenBucket']
class TokenBucket(object):
fill_rate = None
capacity = 1
timestamp = None

def __init__(self, fill_rate, capacity=1):
self.capacity = float(capacity)
self._tokens = capacity
self.fill_rate = float(fill_rate)
self.timestamp = monotonic()
self.contents = deque()

def add(self, item):
self.contents.append(item)

def pop(self):
return self.contents.popleft()

def clear_pending(self):
self.contents.clear()

def can_consume(self, tokens=1):
if tokens <= self._get_tokens():
self._tokens -= tokens
return True
return False

def expected_time(self, tokens=1):
_tokens = self._get_tokens()
tokens = max(tokens, _tokens)
return (tokens - _tokens) / self.fill_rate

def _get_tokens(self):
if self._tokens < self.capacity:
now = monotonic()
delta = self.fill_rate * (now - self.timestamp)
self._tokens = min(self.capacity, self._tokens + delta)
self.timestamp = now
return self._tokens

eve 中实现了 令牌桶的装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 利用 redis 设置过期时间来限制 令牌桶的周期
def ratelimit():
def decorator(f):
@wraps(f)
def rate_limited(*args, **kwargs):
method_limit = app.config.get('RATE_LIMIT_' + request.method)
if method_limit and app.redis:
limit = method_limit[0]
period = method_limit[1]
key = 'rate-limit/%s' % (request.authorization.username
if request.authorization else
request.remote_addr)
rlimit = RateLimit(key, limit, period, True)
if rlimit.over_limit:
return Response('Rate limit exceeded', 429)
g._rate_limit = rlimit
else:
g._rate_limit = None
return f(*args, **kwargs)
return rate_limited
return decorator

# 令牌类
class RateLimit(object):
def __init__(self, key, limit, period, send_x_headers=True):
self.reset = int(time.time()) + period
self.key = key
self.limit = limit
self.period = period
self.send_x_headers = send_x_headers
p = app.redis.pipeline()
p.incr(self.key)
# 设置 令牌桶的过期时间
p.expireat(self.key, self.reset)
self.current = p.execute()[0]

remaining = property(lambda x: x.limit - x.current)
over_limit = property(lambda x: x.current > x.limit)
# 使用

其他

在 nginx 可以使用 ngx_http_limit_conn_modulengx_http_limit_req_module 这两个模块来进行限流

ngx_http_limit_conn_module 限流

安装

1
2
3
# 第三方模块编译
./configure --prefix=/usr/local/nginx --add-module=/opt/nginx_limit_speed_module-master/
make && make install

配置

  • limit_conn_zone
    limit_conn_zone $variable zone=name:size; 配置在 http 块上,例如 limit_conn_zone $binary_remote_addr zone=ipconc:10m; binary_remote_addr 的变量固定为4字节,1M 共享空间可以保存 3.2万个 32 位状态,1.6 万个 64 位状态。 zone定义区域名,size定义内存共享空间大小。当共享空间耗尽,服务器会返回 503
  • limit_conn
    limit_conn zone_name number 配置在 http、server、location 块上。例如limit_conn ipconc 20; 指定键值最大同时连接数(同一 IP 同一时间只允许 20 个连接), 超过则会返回 503
  • limit_req_zone
    limit_req_zone $variable zone=name: size rate=rate; 配置在 http 块。例如limit_req_zone $binary_remote_addr zone=allips:10m rate=15r/s; 该命令设置一块共享内存限制域来保存键值的状态参数。当请求速率超过限制时,返回 limit_req_status 默认值为 503,限制域空间耗尽后,后续请求返回 503
  • limit_req
    limit_req zone=name burst=number [nodelay]; 配置在 http、server、location 块上, 例如limit_req zone=allips burst=5 nodelay; burst 表示请求队列长度, 每秒可以处理 rate + burst 个请求

本文发表于 2019-01-14,最后修改于 2020-04-04。

本站永久域名blog.amoyiki.com,也可搜索「 四畳半神话大系 」找到我。

期待关注我的 ,查看最近的文章和动态。


上一篇 « Git hook 实现自动部署 下一篇 » Linux 挂着硬盘

推荐阅读

Big Image