- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
原文链接:
上一篇文章介绍了 如何实现计数器限流? 主要有两种实现方式,分别是固定窗口和滑动窗口,并且分析了 go-zero 采用固定窗口方式实现的源码.
但是采用固定窗口实现的限流器会有两个问题:
这篇文章来介绍一下令牌桶算法,可以很好解决以上两个问题.
算法概念如下:
令牌桶算法既能够将所有的请求平均分布到时间区间内,又能接受服务器能够承受范围内的突发请求,因此是目前使用较为广泛的一种限流算法.
源码分析我们还是以 go-zero 项目为例,首先来看生成令牌的部分,依然是使用 Redis 来实现.
// core/limit/tokenlimit.go
// 生成 token 速率
script = `local rate = tonumber(ARGV[1])
// 通容量
local capacity = tonumber(ARGV[2])
// 当前时间戳
local now = tonumber(ARGV[3])
// 请求数量
local requested = tonumber(ARGV[4])
// 需要多少秒才能把桶填满
local fill_time = capacity/rate
// 向下取整,ttl 为填满时间 2 倍
local ttl = math.floor(fill_time*2)
// 当前桶剩余容量,如果为 nil,说明第一次使用,赋值为桶最大容量
local last_tokens = tonumber(redis.call("get", KEYS[1]))
if last_tokens == nil then
last_tokens = capacity
end
// 上次请求时间戳,如果为 nil 则赋值 0
local last_refreshed = tonumber(redis.call("get", KEYS[2]))
if last_refreshed == nil then
last_refreshed = 0
end
// 距离上一次请求的时间跨度
local delta = math.max(0, now-last_refreshed)
// 距离上一次请求的时间跨度能生成的 token 数量和桶内剩余 token 数量的和
// 与桶容量比较,取二者的小值
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
// 判断请求数量和桶内 token 数量的大小
local allowed = filled_tokens >= requested
// 被请求消耗掉之后,更新剩余 token 数量
local new_tokens = filled_tokens
if allowed then
new_tokens = filled_tokens - requested
end
// 更新 redis token
redis.call("setex", KEYS[1], ttl, new_tokens)
// 更新 redis 刷新时间
redis.call("setex", KEYS[2], ttl, now)
return allowed`
Redis 中主要保存两个 key,分别是 token 数量和刷新时间.
核心思想就是比较两次请求时间间隔内生成的 token 数量 + 桶内剩余 token 数量,和请求量之间的大小,如果满足则允许,否则则不允许.
限流器初始化:
// A TokenLimiter controls how frequently events are allowed to happen with in one second.
type TokenLimiter struct {
// 生成 token 速率
rate int
// 桶容量
burst int
store *redis.Redis
// 桶 key
tokenKey string
// 桶刷新时间 key
timestampKey string
rescueLock sync.Mutex
// redis 健康标识
redisAlive uint32
// redis 健康监控启动状态
monitorStarted bool
// 内置单机限流器
rescueLimiter *xrate.Limiter
}
// NewTokenLimiter returns a new TokenLimiter that allows events up to rate and permits
// bursts of at most burst tokens.
func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {
tokenKey := fmt.Sprintf(tokenFormat, key)
timestampKey := fmt.Sprintf(timestampFormat, key)
return &TokenLimiter{
rate: rate,
burst: burst,
store: store,
tokenKey: tokenKey,
timestampKey: timestampKey,
redisAlive: 1,
rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),
}
}
其中有一个变量 rescueLimiter ,这是一个进程内的限流器。如果 Redis 发生故障了,那么就使用这个,算是一个保障,尽量避免系统被突发流量拖垮.
提供了四个可调用方法:
// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *TokenLimiter) Allow() bool {
return lim.AllowN(time.Now(), 1)
}
// AllowCtx is shorthand for AllowNCtx(ctx,time.Now(), 1) with incoming context.
func (lim *TokenLimiter) AllowCtx(ctx context.Context) bool {
return lim.AllowNCtx(ctx, time.Now(), 1)
}
// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(context.Background(), now, n)
}
// AllowNCtx reports whether n events may happen at time now with incoming context.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowNCtx(ctx context.Context, now time.Time, n int) bool {
return lim.reserveN(ctx, now, n)
}
最终调用的都是 reverveN 方法:
func (lim *TokenLimiter) reserveN(ctx context.Context, now time.Time, n int) bool {
// 判断 Redis 健康状态,如果 Redis 故障,则使用进程内限流器
if atomic.LoadUint32(&lim.redisAlive) == 0 {
return lim.rescueLimiter.AllowN(now, n)
}
// 执行限流脚本
resp, err := lim.store.EvalCtx(ctx,
script,
[]string{
lim.tokenKey,
lim.timestampKey,
},
[]string{
strconv.Itoa(lim.rate),
strconv.Itoa(lim.burst),
strconv.FormatInt(now.Unix(), 10),
strconv.Itoa(n),
})
// redis allowed == false
// Lua boolean false -> r Nil bulk reply
if err == redis.Nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
logx.Errorf("fail to use rate limiter: %s", err)
return false
}
if err != nil {
logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
// 如果有异常的话,会启动进程内限流
lim.startMonitor()
return lim.rescueLimiter.AllowN(now, n)
}
code, ok := resp.(int64)
if !ok {
logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)
lim.startMonitor()
return lim.rescueLimiter.AllowN(now, n)
}
// redis allowed == true
// Lua boolean true -> r integer reply with value of 1
return code == 1
}
最后看一下进程内限流的启动与恢复:
func (lim *TokenLimiter) startMonitor() {
lim.rescueLock.Lock()
defer lim.rescueLock.Unlock()
// 需要加锁保护,如果程序已经启动了,直接返回,不要重复启动
if lim.monitorStarted {
return
}
lim.monitorStarted = true
atomic.StoreUint32(&lim.redisAlive, 0)
go lim.waitForRedis()
}
func (lim *TokenLimiter) waitForRedis() {
ticker := time.NewTicker(pingInterval)
// 更新监控进程的状态
defer func() {
ticker.Stop()
lim.rescueLock.Lock()
lim.monitorStarted = false
lim.rescueLock.Unlock()
}()
for range ticker.C {
// 对 redis 进行健康监测,如果 redis 服务恢复了
// 则更新 redisAlive 标识,并退出 goroutine
if lim.store.Ping() {
atomic.StoreUint32(&lim.redisAlive, 1)
return
}
}
}
以上就是本文的全部内容,如果觉得还不错的话欢迎 点赞 , 转发 和 关注 ,感谢支持.
参考文章:
推荐阅读:
最后此篇关于go-zero是如何实现令牌桶限流的?的文章就讲到这里了,如果你想了解更多关于go-zero是如何实现令牌桶限流的?的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
翻译: 用法:zeros(shape, dtype=float, order='C') 返回:返回来一个给定形状和类型的用0填充的数组; 参数:shape:形状 dtype:数据类型,可选参
我想像这样格式化一个 double: 1.23 => 1.2 1.0 => 1 0.4 => 0.4 0 => 0 对应的字符串格式是什么?我目前正在使用 StringFormat={}{0
在 simple geometric program 中用 Javascript 和 Canvas 编写,当我将 Angular 设置为 270° (1½π) 时,我预计 Math.cos(θ) 会变
我们有一些基于 Linux (Centos) 的虚拟机,它们将用作可分发的虚拟设备。我们希望能够尽可能地压缩它们以便分发(通过 tar.gz、zip 等)。 我们删除了所有不必要的文件(.log's、
之前有一个问题,它得到了答案: 感谢那。 “现在我已经格式化了我的单元格: h "小时"m "分钟" 因此,如果我的单元格有 7:00,它会显示为 7 小时 0 分钟。如果小时或分钟为零,有没有办法删
这个问题已经有答案了: C program to convert Fahrenheit to Celsius always prints zero (6 个回答) 已关闭 4 年前。 我的以下简单编码
我有一个类的以下代码。这是一个类的初始化。 第三方动态链接库 [DllImport("gdi32.dll")] public static extern IntPtr CreateCompatib
这是我书中的一段代码,我不确定匹配是如何工作的,因为它似乎第一个案例匹配所有内容。以下是 Ocaml 向我提出的警告: # let zero = 0;; # let one = 1;; # let r
我正在尝试重构一些现有代码into a more monodic approach 。现有代码包含接口(interface) IXInterface 和数字,例如 int 和 bool。默认情况下,数
我一直在考虑单词序列的 0 填充以及如何将 0 填充转换为嵌入层。乍一看,人们会认为您也希望保持嵌入 = 0.0。但是,keras 中的嵌入层会为任何输入标记生成随机值,并且无法强制其生成 0.0。请
我正在尝试使用 Pandas 解决以下 python 面试问题: 给定一个 m x n 矩阵,如果一个元素为 0,则将其整个行和列设置为 0。就地执行。 这里有一些例子: # Example 1 [[
我正在优化我正在编写的程序中最耗时的循环,该循环对数组中的许多条目求和,其中许多条目将为零。在添加之前检查条目是否为零或跳过检查并添加所有条目是否更快?下面每一个的例子。这是在 C++ 中。谢谢! d
之前(作为菜鸟)我将它作为 R 包错误提交,让我由你们来运行它。我认为以下所有内容都很好: replace_number("123 0 boogie") [1] "one hundred twenty
默认情况下,在BPI零M2上禁用eth0。。在这里,我们将演示如何启用它
我有一个 PG 数据库表价格。结构如下: id name total_sales created_at 1 A 0.0 2016-01-01
这个问题在这里已经有了答案: Difference between numpy.array shape (R, 1) and (R,) (8 个答案) 关闭 6 年前。 有什么区别 numpy.ze
是否可以通过 Skype 用户窗口获取处理程序并使用 SendMessage(whdl,BM_CLICK,intptr.zero,intrptr.zero,intptr.zero) 单击发送文件或调用
我使用开箱即用的 MVC 4 简单成员资格。我对网站做了很多修改,现在我要回去清理,我发现我不能再修改我的密码了。我一定是视而不见,因为我认为这应该很容易解决,但我只花了 2 天时间解决这个问题。 我
我是CorePlot的新手,终于搞定了一些散线图显示。如何将 X 轴设置为零并位于图形底部,将 Y 轴设置为零并位于图形左侧? 最佳答案 将 plotSpace 的 xRange 设置为 plotSp
我已经为数据表实现了 LazyLoading。当我使用分页浏览数据表时,出现以下异常。 com.sun.faces.context.PartialViewContextImpl processPart
我是一名优秀的程序员,十分优秀!