- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
注:本文隶属于《理解ASP.NET Core》系列文章,请查看置顶博客或 点击此处查看全文目录 。
在微服务化的架构设计中,网关扮演着重要的看门人角色,它所提供的功能之一就是 限流 。而对于众多非微服务化的系统来说,可能并不会部署网关(无论是因为成本还是复杂度),在这种场景下,为了实现 限流 ,微软在 .NET 7 中提供了官方的限流中间件。下面我们一起来看一下.
首先,确保你的应用依赖的 SDK 版本 >= 7,接着通过 AddRateLimiter 扩展方法注册限流服务,并添加限流策略,然后通过 UseRateLimiter 启用限流中间件,最后配置某个路由的请求使用限流策略:
builder.Services.AddRateLimiter(limiterOptions =>
{
// 配置限流策略
});
app.UseRateLimiter();
app.MapGet("LimitTest", async () =>
{
await Task.Delay(TimeSpan.FromSeconds(1));
return Results.Ok($"Limiter");
}).RequireRateLimiting("my_policy");
微软为我们提供了 4 种常用的限流算法:
我们通常会注册一个命名限流策略,并在该策略内指定限流算法,以及其他限流逻辑.
另外,需要关注一下 UseRateLimiter 的调用位置。若限流行为作用于特定路由,则限流中间件必须放置在 UseRouting 之后.
固定窗口限流器是一种简单的限流方式:
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddFixedWindowLimiter(policyName: "fixed", fixedOptions =>
{
fixedOptions.PermitLimit = 4;
fixedOptions.Window = TimeSpan.FromSeconds(60);
fixedOptions.QueueLimit = 2;
fixedOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
fixedOptions.AutoReplenishment = true;
});
});
public sealed class FixedWindowRateLimiterOptions
{
public TimeSpan Window { get; set; } = TimeSpan.Zero;
public bool AutoReplenishment { get; set; } = true;
public int PermitLimit { get; set; }
public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;
public int QueueLimit { get; set; }
}
如上所示,我们通过 AddFixedWindowLimiter 添加了一个固定窗口限流策略,并指定策略名为 fixed 。它的含义是窗口时间长度为60s,在每个窗口时间范围内,最多允许4个请求被处理.
各配置项含义如下:
TimeSpan.Zero
RejectionStatusCode
true
。如果设置为 false
,则需要手动调用 FixedWindowRateLimiter.TryReplenish
来重置 滑动窗口限流器是固定窗口限流器的升级版:
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddSlidingWindowLimiter(policyName: "sliding", slidingOptions =>
{
slidingOptions.PermitLimit = 100;
slidingOptions.Window = TimeSpan.FromSeconds(30);
slidingOptions.QueueLimit = 2;
slidingOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
slidingOptions.AutoReplenishment = true;
slidingOptions.SegmentsPerWindow = 3;
});
});
public sealed class SlidingWindowRateLimiterOptions
{
public TimeSpan Window { get; set; } = TimeSpan.Zero;
public int SegmentsPerWindow { get; set; }
public bool AutoReplenishment { get; set; } = true;
public int PermitLimit { get; set; }
public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;
public int QueueLimit { get; set; }
}
如上所示,我们通过 AddSlidingWindowLimiter 添加了一个滑动窗口限流策略,并指定策略名为 sliding 。它的含义是窗口时间长度为30s,在每个窗口时间范围内,最多允许100个请求,窗口段数为 3,每个段的时间间隔为 30s / 3 = 10s,即窗口每 10s 滑动一段.
各配置项含义如下:
TimeSpan.Zero
true
。如果设置为 false
,则需要手动调用 SlidingWindowRateLimiter.TryReplenish
来重置 为了更好地理解滑动窗口限流器的工作原理,下面我会借用官方文档提供的一张图来详细解释一下:
假设 :限制每个窗口的请求数为 100,窗口时间为 30s,每个窗口的段数为 3,那么每个段的时间间隔就是 30s / 3 = 10s。 定义 :当前段结存请求数 = 当前段可用请求数 - 处理请求数 + 回收请求数 。
限流器工作流程:
令牌桶限流器是一种限制数据平均传输速率的限流算法:
以下图为例,桶内有 3 个令牌(token),进来了 5 个请求,前三个请求可以拿到令牌(token),它们会被处理,后面两个就只能排队或被限流拒绝.
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddTokenBucketLimiter(policyName: "token_bucket", tokenBucketOptions =>
{
tokenBucketOptions.TokenLimit = 4;
tokenBucketOptions.ReplenishmentPeriod = TimeSpan.FromSeconds(10);
tokenBucketOptions.TokensPerPeriod = 2;
tokenBucketOptions.QueueLimit = 2;
tokenBucketOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
tokenBucketOptions.AutoReplenishment = true;
});
});
public sealed class TokenBucketRateLimiterOptions
{
public TimeSpan ReplenishmentPeriod { get; set; } = TimeSpan.Zero;
public int TokensPerPeriod { get; set; }
public bool AutoReplenishment { get; set; } = true;
public int TokenLimit { get; set; }
public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;
public int QueueLimit { get; set; }
}
如上所示,我们通过 AddTokenBucketLimiter 添加了一个令牌桶限流策略,并指定策略名为 token_bucket 。它的含义是桶最多可以装 4 个令牌,每 10s 发放一次令牌,每次发放 2 个令牌,所以在一个发放周期内,最多可以处理 4 个请求,至少可以处理 2 个请求 。
各配置项含义如下:
TimeSpan.Zero
true
。如果设置为 false
,则需要手动调用 TokenBucketRateLimiter.TryReplenish
来发放 并发限流器不是限制一段时间内的最大请求数,而是限制并发数:
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddConcurrencyLimiter(policyName: "concurrency", concurrencyOptions =>
{
concurrencyOptions.PermitLimit = 4;
concurrencyOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
concurrencyOptions.QueueLimit = 2;
});
});
public sealed class ConcurrencyLimiterOptions
{
public int PermitLimit { get; set; }
public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;
public int QueueLimit { get; set; }
}
如上所示,我们通过 AddConcurrencyLimiter 添加了一个并发限流策略,并指定策略名为 concurrency 。它的含义是最多可以并发4个请求被处理.
各配置项含义如下:
上面已经把常用的限流算法介绍完了,下面来看一下可以通过 limiterOptions 进行哪些配置:
public sealed class RateLimiterOptions
{
// 仅保留了常用的配置项,其他相关代码均忽略
// 全局限流器
public PartitionedRateLimiter<HttpContext>? GlobalLimiter { get; set; }
// 当请求被限流拒绝时执行
public Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected { get; set; }
// 当期你去被限流拒绝时的 Http 响应状态码
public int RejectionStatusCode { get; set; } = StatusCodes.Status503ServiceUnavailable;
}
通过 GlobalLimiter ,我们可以设置全局限流器,更准确的说法是全局分区限流器,该限流器会应用于所有请求。执行顺序为先执行全局限流器,再执行特定于路由终结点的限流器(如果存在的话).
需要注意的是,相对于上面注册的限流策略来说, GlobalLimiter 已经是一个限流器实例了,所以需要分配给他一个分区限流器实例,通过 PartitionedRateLimiter.Create 来创建.
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.GlobalLimiter = PartitionedRateLimiter.Create<HttpContext, IPAddress>(context =>
{
IPAddress? remoteIpAddress = context.Connection.RemoteIpAddress;
// 针对非回环地址限流
if (!IPAddress.IsLoopback(remoteIpAddress!))
{
return RateLimitPartition.GetTokenBucketLimiter
(remoteIpAddress!, _ =>
new TokenBucketRateLimiterOptions
{
TokenLimit = 4,
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = 2,
ReplenishmentPeriod = TimeSpan.FromSeconds(10),
TokensPerPeriod = 10,
AutoReplenishment = true
});
}
// 若为回环地址,则不限流
return RateLimitPartition.GetNoLimiter(IPAddress.Loopback);
});
});
它并不是一个新类型的限流器,而是可以将我们上面提到的分区限流器进行组合而得到一个新的分区限流器.
例如我可以将包含固定窗口限流逻辑的分区限流器和将包含并发限流逻辑的分区限流器组合进行组合,那么应用该限流器的请求就会先被固定窗口限流器处理,再被并发限流器处理,任意一个被限流,就会被拒绝.
var chainedLimiter = PartitionedRateLimiter.CreateChained(
PartitionedRateLimiter.Create<HttpContext, string>(httpContext =>
{
var userAgent = httpContext.Request.Headers.UserAgent.ToString();
return RateLimitPartition.GetFixedWindowLimiter
(userAgent, _ =>
new FixedWindowRateLimiterOptions
{
AutoReplenishment = true,
PermitLimit = 4,
Window = TimeSpan.FromSeconds(2)
});
}),
PartitionedRateLimiter.Create<HttpContext, string>(httpContext =>
{
var userAgent = httpContext.Request.Headers.UserAgent.ToString();
return RateLimitPartition.GetConcurrencyLimiter
(userAgent, _ =>
new ConcurrencyLimiterOptions
{
PermitLimit = 4,
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = 2
});
})
);
目前链式组合的限流器只能用于全局限流器,而不能用于终结点限流器.
通过 RejectionStatusCode ,我们可以设置请求被限流拒绝后,http默认的响应状态码。默认为 503 服务不可用,我们可以指定为 429 过多的请求.
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.RejectionStatusCode = StatusCodes.Status429TooManyRequests;
});
另外,该状态码可以在 OnRejected 中被重写,具体参见下小节.
当请求被限流时,会触发回调 OnRejected ,通过该委托我们可以针对 http 响应进行自定义配置:
Retry-After
,指示多长时间后重试请求。需要注意的是,并发限流器无法获取到 RetryAfter,因为它不是时间段的限流,而是限制的并发数
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.OnRejected = (context, cancellationToken) =>
{
if (context.Lease.TryGetMetadata(MetadataName.RetryAfter, out var retryAfter))
{
context.HttpContext.Response.Headers.RetryAfter =
((int)retryAfter.TotalSeconds).ToString(NumberFormatInfo.InvariantInfo);
}
// 可以重新设置响应状态码,会覆盖掉上面设置的 limiterOptions.RejectionStatusCod
context.HttpContext.Response.StatusCode = StatusCodes.Status429TooManyRequests;
context.HttpContext.RequestServices.GetService<ILoggerFactory>()?
.CreateLogger("Microsoft.AspNetCore.RateLimitingMiddleware")
.LogWarning("OnRejected: {GetUserEndPoint}", GetUserEndPoint(context.HttpContext));
return ValueTask.CompletedTask;
};
});
上述提到的限流策略,并不能满足我们所有的需求,所以了解如何自定义限流策略是我们的必修课.
在开始编码之前,你需要了解以下内容:
AddXXXLimiter
添加的限流策略,内部实际上调用了 AddPolicy
(后面的部分会详细介绍) AddXXXLimiter
添加的限流策略,每种策略只有一个分区,即使用了该限流策略的路由共享一个分区。例如通过 AddFixedWindowLimiter
添加了限流策略“fixed”,窗口阈值为 10,并有 10 个路由使用了该策略,那么在一个窗口内,这 10 个路由总的请求数达到 10,那这 10 个路由后续的请求都会被限流。 下面我们就借助 AddPolicy ,分别使用两种方式添加一个自定义策略“my_policy”:一个用户一个分区,匿名用户共享一个分区 。
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddPolicy(policyName: "my_policy", httpcontext =>
{
var userId = "anonymous user";
if (httpcontext.User.Identity?.IsAuthenticated is true)
{
userId = httpcontext.User.Claims.First(c => c.Type == "id").Value;
}
return RateLimitPartition.GetFixedWindowLimiter(partitionKey: userId, _ => new
FixedWindowRateLimiterOptions
{
PermitLimit = 3,
Window = TimeSpan.FromSeconds(60),
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = 0
});
});
});
public interface IRateLimiterPolicy<TPartitionKey>
{
// 若不为空,则执行它(不会执行全局的),如果它为空,则执行全局的
Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected { get; }
// 获取限流分区
RateLimitPartition<TPartitionKey> GetPartition(HttpContext httpContext);
}
public class MyRateLimiterPolicy : IRateLimiterPolicy<string>
{
// 可以通过依赖注入参数
public MyRateLimiterPolicy(ILogger<MyRateLimiterPolicy> logger)
{
// 可以设置自己的限流拒绝回调逻辑,而不使用上面全局设置的 limiterOptions.OnRejected
OnRejected = (ctx, token) =>
{
ctx.HttpContext.Response.StatusCode = StatusCodes.Status429TooManyRequests;
logger.LogWarning($"Request rejected by {nameof(MyRateLimiterPolicy)}");
return ValueTask.CompletedTask;
};
}
public Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected { get; }
public RateLimitPartition<string> GetPartition(HttpContext httpContext)
{
var userId = "anonymous user";
if (httpContext.User.Identity?.IsAuthenticated is true)
{
userId = httpContext.User.Claims.First(c => c.Type == "id").Value;
}
return RateLimitPartition.GetFixedWindowLimiter(partitionKey: userId, _ => new
FixedWindowRateLimiterOptions
{
PermitLimit = 3,
Window = TimeSpan.FromSeconds(60),
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = 0
});
}
}
// 记得注册它
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddPolicy<string, MyRateLimiterPolicy>(policyName: "my_policy");
}
可以一次性为所有 controller 应用限流策略 。
app.MapControllers().RequireRateLimiting("fixed");
也可以为指定路由应用限流策略 。
app.MapGet("LimitTest", () =>{ }).RequireRateLimiting("fixed");
实质上, RequireRateLimiting 和 DisableRateLimiting 是通过向终结点元数据中 EnableRateLimiting 和 DisableRateLimiting 两个特性来实现的.
public static class RateLimiterEndpointConventionBuilderExtensions
{
public static TBuilder RequireRateLimiting<TBuilder>(this TBuilder builder, string policyName) where TBuilder : IEndpointConventionBuilder
{
builder.Add(endpointBuilder => endpointBuilder.Metadata.Add(new EnableRateLimitingAttribute(policyName)));
return builder;
}
public static TBuilder RequireRateLimiting<TBuilder, TPartitionKey>(this TBuilder builder, IRateLimiterPolicy<TPartitionKey> policy) where TBuilder : IEndpointConventionBuilder
{
builder.Add(endpointBuilder =>
{
endpointBuilder.Metadata.Add(new EnableRateLimitingAttribute(new
DefaultRateLimiterPolicy(
RateLimiterOptions.ConvertPartitioner<TPartitionKey>(null, policy.GetPartition), policy.OnRejected)));
});
return builder;
}
public static TBuilder DisableRateLimiting<TBuilder>(this TBuilder builder) where TBuilder : IEndpointConventionBuilder
{
builder.Add(endpointBuilder => endpointBuilder.Metadata.Add(DisableRateLimitingAttribute.Instance));
return builder;
}
}
在 Controller 层面,我们可以方便的使用特性来标注使用或禁用限流策略。这两个特性可以标注在 Controller 类上,也可以标注在类的方法上.
但需要注意的时,如果前面使用了 RequireRateLimiting 或 DisableRateLimiting 扩展方法,由于它们在元数据中添加特性比直接使用特性标注要晚,所以它们的优先级很高,会覆盖掉这里使用的策略。建议不要针对所有 Controller 使用 RequireRateLimiting 或 DisableRateLimiting .
下面是一个应用示例:
[EnableRateLimiting("fixed")] // 针对整个 Controller 使用限流策略 fixed
public class WeatherForecastController : ControllerBase
{
// 会使用 Controller 类上标注的 fixed 限流策略
[HttpGet(Name = "GetWeatherForecast")]
public string Get() => "Get";
[HttpGet("Hello")]
[EnableRateLimiting("my_policy")] // 会使用 my_policy 限流策略,而不会使用 fixed
public string Hello() => "Hello";
[HttpGet("disable")]
[DisableRateLimiting] // 禁用任何限流策略
public string Disable() => "Disable";
}
为了方便理解接下来的内容,先明确几个容易混淆的类型的概念:
TKey
表示分区的 Key,被同一限流分区作用的请求会互相影响,不同限流分区则不影响。 RateLimitPartition<TKey>
TResource
表示被限流的资源类型,比如 Http 请求类型为 HttpContext
。限流中间件就是通过它来进行限流操作的。 PartitionedRateLimiter<TResource>
篇幅所限,下方示例列出的源码会忽略一部分非核心代码.
AddRateLimiter 很简单,只是单纯的进行选项配置:
public static class RateLimiterServiceCollectionExtensions
{
public static IServiceCollection AddRateLimiter(this IServiceCollection services, Action<RateLimiterOptions> configureOptions)
{
services.Configure(configureOptions);
return services;
}
}
以下仅以 AddFixedWindowLimiter 为例进行讲解,其他三个都是类似的.
public static class RateLimiterOptionsExtensions
{
public static RateLimiterOptions AddFixedWindowLimiter(this RateLimiterOptions options, string policyName, Action<FixedWindowRateLimiterOptions> configureOptions)
{
var key = new PolicyNameKey() { PolicyName = policyName };
var fixedWindowRateLimiterOptions = new FixedWindowRateLimiterOptions();
configureOptions.Invoke(fixedWindowRateLimiterOptions);
fixedWindowRateLimiterOptions.AutoReplenishment = false;
return options.AddPolicy(policyName, context =>
{
return RateLimitPartition.GetFixedWindowLimiter(key,
_ => fixedWindowRateLimiterOptions);
});
}
}
首先是配置选项,可以看到它把 AutoReplenishment 强制设置为了 false ,不对啊,如果这样设置岂不是要我来手动调用 TryReplenish 来重置次数了。其实不然,我们一会看 GetFixedWindowLimiter 的实现就知道原因了.
接着就是调用 AddPolicy ,传入策略名和一个委托来添加策略,该委托会返回一个限流分区,分区内可以通过工厂获取限流器实例。可以看到该策略的分区 key 是固定不变的,即该策略共享一个限流分区.
public static class RateLimitPartition
{
public static RateLimitPartition<TKey> GetFixedWindowLimiter<TKey>(
TKey partitionKey,
Func<TKey, FixedWindowRateLimiterOptions> factory)
{
return Get(partitionKey, key =>
{
FixedWindowRateLimiterOptions options = factory(key);
if (options.AutoReplenishment is true)
{
options = new FixedWindowRateLimiterOptions
{
PermitLimit = options.PermitLimit,
QueueProcessingOrder = options.QueueProcessingOrder,
QueueLimit = options.QueueLimit,
Window = options.Window,
AutoReplenishment = false
};
}
return new FixedWindowRateLimiter(options);
});
}
public static RateLimitPartition<TKey> Get<TKey>(
TKey partitionKey,
Func<TKey, RateLimiter> factory)
=> new RateLimitPartition<TKey>(partitionKey, factory);
可以看到,如果 AutoReplenishment 为 true ,会重新 new 一个新选项,这个新的选项仅仅是将 AutoReplenishment 设置为 false 。为什么呢?这是因为如果它为 true ,那么每一个 FixedWindowRateLimiters 实例(即限流分区)都会有一个自己的定时器来定时补充许可,这无疑是很浪费的。所以将它设置为 false ,由分区限流器中的的定时器来统一管理其下的所有分区,降低资源消耗,不用担心,微软已经帮我们实现好了(具体在 RateLimitingMiddleware 小节中会介绍),不需要自己实现.
策略被保存到 RateLimiterOptions 的 PolicyMap 和 UnactivatedPolicyMap 中,其中:
PolicyMap
是指已经创建了创建了策略实例的限流策略集 UnactivatedPolicyMap
是指还未创建策略实例的限流策略集,它保存的不是策略实例,而是创建策略的委托。这种一般是实现了 IRateLimiterPolicy<TPartitionKey>
接口的策略,我们需要在运行时向它的构造函数注入一些参数。 我们的固定窗口限流器策略显然是存放到 PolicyMap 中,:
public sealed class RateLimiterOptions
{
internal Dictionary<string, DefaultRateLimiterPolicy> PolicyMap { get; }
= new Dictionary<string, DefaultRateLimiterPolicy>(StringComparer.Ordinal);
internal Dictionary<string, Func<IServiceProvider, DefaultRateLimiterPolicy>> UnactivatedPolicyMap { get; }
= new Dictionary<string, Func<IServiceProvider, DefaultRateLimiterPolicy>>(StringComparer.Ordinal);
public RateLimiterOptions AddPolicy<TPartitionKey>(string policyName, Func<HttpContext, RateLimitPartition<TPartitionKey>> partitioner)
{
// 策略名不能重复
if (PolicyMap.ContainsKey(policyName) || UnactivatedPolicyMap.ContainsKey(policyName))
{
throw ...;
}
PolicyMap.Add(policyName, new DefaultRateLimiterPolicy(ConvertPartitioner<TPartitionKey>(policyName, partitioner), null));
return this;
}
}
可以看到,承载策略的实例类型均为 DefaultRateLimiterPolicy ,即使你是注册的 IRateLimiterPolicy<TPartitionKey> 类型的策略,最终也是会转化为 DefaultRateLimiterPolicy .
现在限流器实例的获取方式已经知道了,那接下来详细看一下 FixedWindowRateLimiter 的详细设计吧.
首先,所有限流器均继承自抽象类 RateLimiter :
public abstract class RateLimiter : IAsyncDisposable, IDisposable
{
public abstract RateLimiterStatistics? GetStatistics();
public abstract TimeSpan? IdleDuration { get; }
public RateLimitLease AttemptAcquire(int permitCount = 1)
=> AttemptAcquireCore(permitCount);
protected abstract RateLimitLease AttemptAcquireCore(int permitCount);
public ValueTask<RateLimitLease> AcquireAsync(int permitCount = 1, CancellationToken cancellationToken = default)
=> AcquireAsyncCore(permitCount, cancellationToken);
protected abstract ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken);
}
其中,我们需要重点关注以下成员:
当前时间 - A
,当有请求获取许可时,空闲周期就会被置为 null
permitCount
参数,表示想要获取的许可数量,默认值为 1。它所允许的值范围是 >= 0,当传入 0 时,表示查看是否还能获取到许可(不会消耗许可数)。 RateLimitLease
拥有一个 bool IsAcquired
属性,表示许可是否获取成功 permitCount
参数,表示想要获取的许可数量,默认值为 1。它所允许的值范围是 >= 0,当传入 0 时,它会一直等待,直到可以获取到许可,或者再也不能获取到许可了(不会消耗许可数)。 RateLimitLease
拥有一个 bool IsAcquired
属性,表示许可是否获取成功 接着,对于 FixedWindowLimiter 、 SlidingWindowLimiter 和 TokenBucketLimiter 来说,它们都是时间范围的限流算法,都具备 Replenish 性质,所以又抽象出一层 ReplenishingRateLimiter :
public abstract class ReplenishingRateLimiter : RateLimiter
{
// 许可发放周期
public abstract TimeSpan ReplenishmentPeriod { get; }
// 是否自动补充许可
public abstract bool IsAutoReplenishing { get; }
// 尝试补充许可
// 当 AutoReplenishment == true 时,不会执行补充许可的逻辑,因为它是自动的,不允许手动干预
public abstract bool TryReplenish();
}
ConcurrencyLimiter 直接继承自 RateLimiter 。
最后具体看一下 FixedWindowRateLimiter 的详细实现,先来看构造函数以及一些常用属性:
public sealed class FixedWindowRateLimiter : ReplenishingRateLimiter
{
// 用于重新补充许可的定时器
private readonly Timer? _renewTimer;
// 选项,会 clone 一份构造函数传进来的 options
private readonly FixedWindowRateLimiterOptions _options;
// 指示许可租赁成功的结果
private static readonly RateLimitLease SuccessfulLease = new FixedWindowLease(true, null);
// 指示许可租赁失败的结果
private static readonly RateLimitLease FailedLease = new FixedWindowLease(false, null);
// 空闲周期
public override TimeSpan? IdleDuration => ...;
// 是否自动补充许可
public override bool IsAutoReplenishing => _options.AutoReplenishment;
// 许可发放周期,对于固定窗口来说,就是窗口大小
public override TimeSpan ReplenishmentPeriod => _options.Window;
public FixedWindowRateLimiter(FixedWindowRateLimiterOptions options)
{
// 省略部分代码...
// 如果 AutoReplenishment == true,则会创建定时器,用于定时补充许可
// 不过我们从前面可以得知,传递到这里的是 false,所以定时器并不会被创建
if (_options.AutoReplenishment)
{
_renewTimer = new Timer(Replenish, this, _options.Window, _options.Window);
}
}
}
接下来是补充许可 TryReplenish 的实现:
public override bool TryReplenish()
{
// 当 AutoReplenishment == true 时,不会执行补充许可的逻辑,因为它是自动的,不允许手动干预
if (_options.AutoReplenishment)
{
return false;
}
Replenish(this);
return true;
}
private static void Replenish(object? state)
{
FixedWindowRateLimiter limiter = (state as FixedWindowRateLimiter)!;
// 获取当前时间
long nowTicks = Stopwatch.GetTimestamp();
limiter!.ReplenishInternal(nowTicks);
}
private void ReplenishInternal(long nowTicks)
{
// 如果当前时间距离上次许可发放时间还没达到窗口大小,则直接返回
if (((nowTicks - _lastReplenishmentTick) * TickFrequency) < _options.Window.Ticks && !_options.AutoReplenishment)
{
return;
}
int availablePermitCounters = _permitCount;
if (availablePermitCounters >= _options.PermitLimit)
{
// 如果当前可用许可数 >= 限流器配置的最大许可数,则无须重新发放,直接返回
return;
}
// 补充许可
_permitCount = _options.PermitLimit;
// 先处理排队的请求
while (_queue.Count > 0)
{
// 根据 QueueProcessingOrder 从队列中找到(Peek)最老或最新的请求
RequestRegistration nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.PeekHead()
: _queue.PeekTail();
// 若请求已完成处理,则只需要将它移出队列(Dequeue),并释放资源即可。
// 请求已完成可能的原因如下:
// 1. 已被取消
// 2. 当 QueueProcessingOrder 设置为 NewestFirst 时,新来的请求把老的踢出了队列
if (nextPendingRequest.Tcs.Task.IsCompleted)
{
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();
nextPendingRequest.CancellationTokenRegistration.Dispose();
}
// 若可用的许可数足够,则从队列中取出请求并处理
else if (_permitCount >= nextPendingRequest.Count)
{
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();
// 扣减
_queueCount -= nextPendingRequest.Count;
_permitCount -= nextPendingRequest.Count;
// 向请求补充许可
// 若发放失败,这还原扣减
if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease))
{
_permitCount += nextPendingRequest.Count;
_queueCount += nextPendingRequest.Count;
}
// 释放资源
nextPendingRequest.CancellationTokenRegistration.Dispose();
}
else
{
// 请求无法被处理,直接跳出
break;
}
}
if (_permitCount == _options.PermitLimit)
{
// 当可用许可数等于配置的最大许可数,则开始计算空闲周期
_idleSince = Stopwatch.GetTimestamp();
}
}
下面一起看一下许可是如何租出去的。由于异步的 AcquireAsyncCore 基本包含了同步的 AttemptAcquireCore 的处理逻辑,所以下面就只看 AcquireAsyncCore 。需要着重说一下的是,同步的 AttemptAcquireCore 是不会进行入队操作的.
源码里面其实有很多锁,为了便于理解我都删除了.
protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken = default)
{
// 当申请的许可数 == 0,并且可用许可数 > 0 时,则直接返回 SuccessfulLease,表示限流器还有可用许可
// 对于同步的 AttemptAcquireCore 方法来说,若此时可用许可数为 0,则会直接返回 FailedLease,表示限流器没有可用许可
if (permitCount == 0 && _permitCount > 0)
{
return new ValueTask<RateLimitLease>(SuccessfulLease);
}
// 尝试租赁
if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))
{
return new ValueTask<RateLimitLease>(lease);
}
// 如果队列装不下要申请许可的所有请求
if (_options.QueueLimit - _queueCount < permitCount)
{
// 如果优先处理新来的,并且要申请许可的请求数没有超过队列的大小限制,
// 则将队列中老的请求踢出队列,直到为新来的请求留出足够的空间,准备将新来的请求加进去
if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && permitCount <= _options.QueueLimit)
{
do
{
RequestRegistration oldestRequest = _queue.DequeueHead();
_queueCount -= oldestRequest.Count;
// 设置老请求申请许可失败
if (!oldestRequest.Tcs.TrySetResult(FailedLease))
{
_queueCount += oldestRequest.Count;
}
oldestRequest.CancellationTokenRegistration.Dispose();
}
while (_options.QueueLimit - _queueCount < permitCount);
}
else
{
// 如果优先处理后来的,则只能返回 失败
return new ValueTask<RateLimitLease>(CreateFailedWindowLease(permitCount));
}
}
// 这部分代码不用太在意
CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken);
CancellationTokenRegistration ctr = default;
if (cancellationToken.CanBeCanceled)
{
ctr = cancellationToken.Register(static obj =>
{
((CancelQueueState)obj!).TrySetCanceled();
}, tcs);
}
RequestRegistration registration = new RequestRegistration(permitCount, tcs, ctr);
// 将新请求加入到队尾
_queue.EnqueueTail(registration);
_queueCount += permitCount;
// 异步可等待,直到 Task 执行完成获取到结果(可能是申请成功,也可能是失败)
return new ValueTask<RateLimitLease>(registration.Tcs.Task);
}
TryLeaseUnsynchronized 具体逻辑如下:
private bool TryLeaseUnsynchronized(int permitCount, out RateLimitLease? lease)
{
// 若可用的许可数足够,且不为 0
if (_permitCount >= permitCount && _permitCount != 0)
{
// 租赁的许可为0,则直接返回 成功
if (permitCount == 0)
{
lease = SuccessfulLease;
return true;
}
// 若:
// 1. 没有排队的请求
// 2. 或有排队的请求,但是 QueueProcessingOrder 被设置为 NewestFirst
// 则租赁成功,其他则租赁失败(因为要先把排队的处理完)
if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst))
{
// 许可租赁出去了,也就表示该限流器不空闲了
_idleSince = null;
_permitCount -= permitCount;
lease = SuccessfulLease;
return true;
}
}
// 租赁失败
lease = null;
return false;
}
现在,我们已经掌握了限流器补充许可和租赁许可的细节逻辑了,并且也得知并没有使用限流器内部的定时器去定时补充许可,那这是由谁补充的呢?又是由谁为请求申请的许可呢?
没错,这都是 RateLimitingMiddleware 负责的.
在构造方法中,我们需要重点关注下 CreateEndpointLimiter ,它创建了终结点分区限流器,与全局限流器一起提供限流服务.
internal sealed partial class RateLimitingMiddleware
{
// 默认被限流拒绝回调的委托,取自 options.OnRejected
private readonly Func<OnRejectedContext, CancellationToken, ValueTask>? _defaultOnRejected;
// 全局限流器,取自 options.GlobalLimiter
private readonly PartitionedRateLimiter<HttpContext>? _globalLimiter;
// 终结点限流器
private readonly PartitionedRateLimiter<HttpContext> _endpointLimiter;
// 限流响应状态码,取自 options.RejectionStatusCode
private readonly int _rejectionStatusCode;
// 限流策略集,取自 options.PolicyMap 和 options.UnactivatedPolicyMap
private readonly Dictionary<string, DefaultRateLimiterPolicy> _policyMap;
public RateLimitingMiddleware(RequestDelegate next, ILogger<RateLimitingMiddleware> logger, IOptions<RateLimiterOptions> options, IServiceProvider serviceProvider, RateLimitingMetrics metrics)
{
// ...省略一堆代码
_endpointLimiter = CreateEndpointLimiter();
}
}
在 CreateEndpointLimiter 方法中,创建了分区限流器,里面包含了各种各样的限流分区,用于不同终结点请求的限流.
private PartitionedRateLimiter<HttpContext> CreateEndpointLimiter()
{
// 创建分区限流器
return PartitionedRateLimiter.Create<HttpContext, DefaultKeyType>(context =>
{
DefaultRateLimiterPolicy? policy;
var enableRateLimitingAttribute = context.GetEndpoint()?.Metadata.GetMetadata<EnableRateLimitingAttribute>();
// 如果不需要限流,则返回 NoLimiter
if (enableRateLimitingAttribute is null)
{
return RateLimitPartition.GetNoLimiter<DefaultKeyType>(_defaultPolicyKey);
}
// 根据限流策略取限流分区
policy = enableRateLimitingAttribute.Policy;
if (policy is not null)
{
return policy.GetPartition(context);
}
var name = enableRateLimitingAttribute.PolicyName;
if (name is not null)
{
if (_policyMap.TryGetValue(name, out policy))
{
return policy.GetPartition(context);
}
else
{
throw new InvalidOperationException($"This endpoint requires a rate limiting policy with name {name}, but no such policy exists.");
}
}
// 虽然策略名或策略不可能为空,但是加一下判断更好
else
{
throw new InvalidOperationException("This endpoint requested a rate limiting policy with a null name.");
}
}, new DefaultKeyTypeEqualityComparer());
}
咦?怎么还是没看到在哪自动补充的许可?实际上它就隐藏在 PartitionedRateLimiter.Create 中的 DefaultPartitionedRateLimiter 里面,藏得太深了:
public static class PartitionedRateLimiter
{
public static PartitionedRateLimiter<TResource> Create<TResource, TPartitionKey>(
Func<TResource, RateLimitPartition<TPartitionKey>> partitioner,
IEqualityComparer<TPartitionKey>? equalityComparer = null) where TPartitionKey : notnull
{
return new DefaultPartitionedRateLimiter<TResource, TPartitionKey>(partitioner, equalityComparer);
}
}
下面是 DefaultPartitionedRateLimiter 启动定时器执行心跳的核心代码:
internal sealed class DefaultPartitionedRateLimiter<TResource, TKey> : PartitionedRateLimiter<TResource> where TKey : notnull
{
// 限流器集合
private readonly Dictionary<TKey, Lazy<RateLimiter>> _limiters;
// 限流分区委托,可通过资源获取到分区
private readonly Func<TResource, RateLimitPartition<TKey>> _partitioner;
// 定时器,主要作用是每 100ms 进行一次心跳,即执行 Heartbeat 方法
private readonly TimerAwaitable _timer;
private readonly Task _timerTask;
public DefaultPartitionedRateLimiter(Func<TResource, RateLimitPartition<TKey>> partitioner,
IEqualityComparer<TKey>? equalityComparer = null)
{
_limiters = new Dictionary<TKey, Lazy<RateLimiter>>(equalityComparer);
_partitioner = partitioner;
var timerInterval = TimeSpan.FromMilliseconds(100);
_timer = new TimerAwaitable(timerInterval, timerInterval);
_timerTask = RunTimer();
}
private async Task RunTimer()
{
_timer.Start();
// 只要 timer 不被停止,则一直返回 true,即 timer 仍在运行中
while (await _timer)
{
try
{
await Heartbeat().ConfigureAwait(false);
}
catch { }
}
_timer.Dispose();
}
}
TimerAwaitable 是一个可异步等待的类型(实现了 GetAwaiter 、 INotifyCompletion 、 IsCompleted 和 GetResult ),内部设计非常有意思。在它内部,启动了一个定时器,每 100ms(传入的timerInterval) Tick 一次,每次 Tick 就会把 IsCompleted 设置为 true ,将任务状态切换为已完成。外部通过 await 获取结果时(静默调用 GetResult ),又会将 IsCompleted 设置为 false ,再将其转换为未完成状态。外部再配合 while 以达到定时执行的效果.
为什么不直接用 Timer 而又弄出一个 TimerAwaitable ?我认为 TimerAwaitable 有以下优点:
通过定时器,每 100ms 执行一次心跳,心跳过程中检查各个限流器是否需要补充许可,如果需要,则补充,并回收空闲限流器等。以下是简化的心跳逻辑:
private async Task Heartbeat()
{
if (_cacheInvalid)
{
_cachedLimiters.Clear();
_cachedLimiters.AddRange(_limiters);
}
// 遍历所有缓存的限流器
foreach (KeyValuePair<TKey, Lazy<RateLimiter>> rateLimiter in _cachedLimiters)
{
// 如果限流器还未被实例化,则跳过
if (!rateLimiter.Value.IsValueCreated) continue;
// 如果限流器空闲周期超过了空闲时间限制(默认10s),则回
if (rateLimiter.Value.Value.IdleDuration is TimeSpan idleDuration && idleDuration > s_idleTimeLimit)
{
lock (Lock)
{
// 双重检测,确保限流器确实是空闲的
idleDuration = rateLimiter.Value.Value.IdleDuration ?? TimeSpan.Zero;
if (idleDuration > s_idleTimeLimit)
{
_cacheInvalid = true;
// 回收该限流器
_limiters.Remove(rateLimiter.Key);
// 保存下来,后面一起释放资源
_limitersToDispose.Add(rateLimiter.Value.Value);
}
}
}
// 如果限流器可补充许可,则尝试补充
else if (rateLimiter.Value.Value is ReplenishingRateLimiter replenishingRateLimiter)
{
try
{
replenishingRateLimiter.TryReplenish();
}
catch (Exception ex) { ... }
}
}
// 释放回收的限流器资源
foreach (RateLimiter limiter in _limitersToDispose)
{
try
{
await limiter.DisposeAsync().ConfigureAwait(false);
}
catch (Exception ex) { ... }
}
_limitersToDispose.Clear();
}
好了,我们已经了解了限流器的管理,让我们再次回到 RateLimitingMiddleware ,看看他是如何工作的吧:
public Task Invoke(HttpContext context)
{
var endpoint = context.GetEndpoint();
// 如果终结点包含禁用限流标记,则不限流
if (endpoint?.Metadata.GetMetadata<DisableRateLimitingAttribute>() is not null)
{
return _next(context);
}
var enableRateLimitingAttribute = endpoint?.Metadata.GetMetadata<EnableRateLimitingAttribute>();
// 如果终结点没有启用限流标记,并且全局限流器也是空的,则同样不限流
if (enableRateLimitingAttribute is null && _globalLimiter is null)
{
return _next(context);
}
return InvokeInternal(context, enableRateLimitingAttribute);
}
private async Task InvokeInternal(HttpContext context, EnableRateLimitingAttribute? enableRateLimitingAttribute)
{
var policyName = enableRateLimitingAttribute?.PolicyName;
// 尝试获取许可
using var leaseContext = await TryAcquireAsync(context);
// 如果获取到了许可,则处理请求
if (leaseContext.Lease?.IsAcquired == true)
{
await _next(context);
}
// 没有获取到许可,则限流拒绝
else
{
// 如果请求是被取消的,则不要执行 OnRejected 回调,应该直接返回
if (leaseContext.RequestRejectionReason == RequestRejectionReason.RequestCanceled)
{
return;
}
var thisRequestOnRejected = _defaultOnRejected;
context.Response.StatusCode = _rejectionStatusCode;
// 如果请求是被终结点限流器限流拒绝的
if (leaseContext.RequestRejectionReason == RequestRejectionReason.EndpointLimiter)
{
// 若策略有自己的 OnRejected,则使用策略的,如果没有,则使用 _defaultOnRejected
// 这里我感觉是个 bug,应该判断 policy?.OnRejected is not null 才赋值
DefaultRateLimiterPolicy policy = enableRateLimitingAttribute?.Policy;
if (policy is not null)
{
thisRequestOnRejected = policy.OnRejected;
}
else
{
// 对于策略名,当 OnRejected 不为空时,才使用策略的 OnRejected
if (policyName is not null && _policyMap.TryGetValue(policyName, out policy) && policy.OnRejected is not null)
{
thisRequestOnRejected = policy.OnRejected;
}
}
}
// 执行回调
if (thisRequestOnRejected is not null)
{
await thisRequestOnRejected(new OnRejectedContext() { HttpContext = context, Lease = leaseContext.Lease! }, context.RequestAborted);
}
}
}
TryAcquireAsync 会先从全局限流器获取许可,如果获取到了,则会继续在终结点限流器中获取许可,如果获取到了,请求才会被处理:
异步的 CombinedWaitAsync 与同步的 CombinedAcquire 类似,只不过前面调用的是异步方法,后面是同步,故下方仅列出 CombinedAcquire 简化源码.
private async ValueTask<LeaseContext> TryAcquireAsync(HttpContext context, MetricsContext metricsContext)
{
// 组合获取,即按顺序从全局限流器和终结点限流器中获取许可
var leaseContext = CombinedAcquire(context);
// 如果获取到了,则直接返回
if (leaseContext.Lease?.IsAcquired == true)
{
return leaseContext;
}
// 异步等待再次获取许可
return await CombinedWaitAsync(context, context.RequestAborted);
}
private LeaseContext CombinedAcquire(HttpContext context)
{
// 全局限流器不为空,则先从其中获取许可
if (_globalLimiter is not null)
{
var globalLease = _globalLimiter.AttemptAcquire(context);
// 未获取许可,直接返回
if (!globalLease.IsAcquired)
{
return new LeaseContext() { RequestRejectionReason = RequestRejectionReason.GlobalLimiter, Lease = globalLease };
}
}
// 从终结点限流器中获取许可
var endpointLease = _endpointLimiter.AttemptAcquire(context);
// 未获取许可,直接返回
if (!endpointLease.IsAcquired)
{
globalLease?.Dispose();
return new LeaseContext() { RequestRejectionReason = RequestRejectionReason.EndpointLimiter, Lease = endpointLease };
}
return globalLease is null
? new LeaseContext() { Lease = endpointLease }
: new LeaseContext() { Lease = new DefaultCombinedLease(globalLease, endpointLease) };
}
AddRateLimiter
注册限流服务,通过 UseRateLimiter
启用限流功能。 options.AddPolicy
添加限流策略,作用于某些终结点,这些策略最终组成的分区限流器称为终结点限流器
options.AddXXXLimiter
的方式快捷添加限流策略 IRateLimiterPolicy<TPartitionKey>
options.GlobalLimiter
设置全局限流器,当请求进入应用时,会先执行全局限流器,再执行终结点限流器。 options.RejectionStatusCode
设置限流拒绝的响应状态码,还可以通过 OnRejected
编写更多的响应逻辑。 PartitionedRateLimiter.CreateChained
将多个分区限流器进行链式组合 最后此篇关于理解ASP.NETCore-限流(RateLimiting)的文章就讲到这里了,如果你想了解更多关于理解ASP.NETCore-限流(RateLimiting)的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
Semaphore 是什么 Semaphore 通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。 可以把它简单的理解成我们停车场入口立着的那个显示
★微服务系列 微服务1:微服务及其演进史 微服务2:微服务全景架构 微服务3:微服务拆分策略 微服务4:服务注册与发现 微服务5:服务注册与发现(实践篇) 微服务6:通信之网关 微服务7:通信之
Sentinel 不仅仅可以可以作用于服务之间,还可以完美的和服务网关 GateWay 或者 Zuul 一起使用来对网关实现流控。从 1.6.0 版本开始,Sentinel 提供了 Spring
我是一名优秀的程序员,十分优秀!