- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
各位中秋节快乐啊,我觉得在这个月圆之夜有必要写一篇源码解析,以表示我内心的高兴~
在我的第二篇文章里面介绍了整个Sentinel的主流程是怎样的。
所以降级的大致流程可以概述为:
1、 设置降级策略,是根据平均响应时间还是异常比例来进行降级的;
2、 根据资源创建一系列的插槽;
3、 依次调用插槽,根据设定的插槽类型来进行降级;
我们先来看个例子,方便大家自己断点跟踪:
private static final String KEY = "abc";
private static final int threadCount = 100;
private static int seconds = 60 + 40;
public static void main(String[] args) throws Exception {
List<DegradeRule> rules = new ArrayList<DegradeRule>();
DegradeRule rule = new DegradeRule();
rule.setResource(KEY);
// set threshold rt, 10 ms
rule.setCount(10);
rule.setGrade(RuleConstant.DEGRADE_GRADE_RT);
rule.setTimeWindow(10);
rules.add(rule);
DegradeRuleManager.loadRules(rules);
for (int i = 0; i < threadCount; i++) {
Thread entryThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Entry entry = null;
try {
TimeUnit.MILLISECONDS.sleep(5);
entry = SphU.entry(KEY);
// token acquired
pass.incrementAndGet();
// sleep 600 ms, as rt
TimeUnit.MILLISECONDS.sleep(600);
} catch (Exception e) {
block.incrementAndGet();
} finally {
total.incrementAndGet();
if (entry != null) {
entry.exit();
}
}
}
}
});
entryThread.setName("working-thread");
entryThread.start();
}
}
其他的流程基本上和第二篇文章里介绍的差不多,这篇文章来介绍Sentinel的主流程,Sentinel的降级策略全部都是在DegradeSlot中进行操作的。
DegradeSlot
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
DegradeSlot会直接调用DegradeRuleManager进行降级的操作,我们直接进入到DegradeRuleManager.checkDegrade方法中。
DegradeRuleManager#checkDegrade
public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
throws BlockException {
//根据resource来获取降级策略
Set<DegradeRule> rules = degradeRules.get(resource.getName());
if (rules == null) {
return;
}
for (DegradeRule rule : rules) {
if (!rule.passCheck(context, node, count)) {
throw new DegradeException(rule.getLimitApp(), rule);
}
}
}
这个方法逻辑也是非常的清晰,首先是根据资源名获取到注册过的降级规则,然后遍历规则集合调用规则的passCheck,如果返回false那么就抛出异常进行降级。
DegradeRule#passCheck
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
//返回false直接进行降级
if (cut.get()) {
return false;
}
//降级是根据资源的全局节点来进行判断降级策略的
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
if (clusterNode == null) {
return true;
}
//根据响应时间降级策略
if (grade == RuleConstant.DEGRADE_GRADE_RT) {
//获取节点的平均响应时间
double rt = clusterNode.avgRt();
if (rt < this.count) {
passCount.set(0);
return true;
}
//rtSlowRequestAmount默认是5
// Sentinel will degrade the service only if count exceeds.
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
// 根据异常比例降级
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
double exception = clusterNode.exceptionQps();
double success = clusterNode.successQps();
double total = clusterNode.totalQps();
// If total amount is less than minRequestAmount, the request will pass.
if (total < minRequestAmount) {
return true;
}
// In the same aligned statistic time window,
// "success" (aka. completed count) = exception count + non-exception count (realSuccess)
double realSuccess = success - exception;
if (realSuccess <= 0 && exception < minRequestAmount) {
return true;
}
if (exception / success < count) {
return true;
}
// 根据异常数降级
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
double exception = clusterNode.totalException();
if (exception < count) {
return true;
}
}
//根据设置的时间窗口进行重置
if (cut.compareAndSet(false, true)) {
ResetTask resetTask = new ResetTask(this);
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}
return false;
}
这个方法首先会去获取cut的值,如果是true那么就直接进行限流操作。然后会根据resource获取ClusterNode全局节点。往下分别根据三种不同的策略来进行降级。
if (grade == RuleConstant.DEGRADE_GRADE_RT) {
//获取节点的平均响应时间
double rt = clusterNode.avgRt();
if (rt < this.count) {
passCount.set(0);
return true;
}
//rtSlowRequestAmount默认是5
// Sentinel will degrade the service only if count exceeds.
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
}
如果是根据响应时间进行降级,那么会获取clusterNode的平均响应时间,如果平均响应时间大于所设定的count(默认是毫秒),那么就调用passCount加1,如果passCount大于5,那么直接降级。
所以看到这里我们应该知道根据平均响应时间降级前几个请求即使响应过长也不会立马降级,而是要等到第六个请求到来才会进行降级。
我们进入到clusterNode的avgRt方法中看一下是如何获取到clusterNode的平均响应时间的。
clusterNode是StatisticNode的实例
StatisticNode#avgRt
public double avgRt() {
//获取当前时间窗口内调用成功的次数
long successCount = rollingCounterInSecond.success();
if (successCount == 0) {
return 0;
}
//获取窗口内的响应时间
return rollingCounterInSecond.rt() * 1.0 / successCount;
}
```e
这个方法主要是调用rollingCounterInSecond获取成功次数,然后再获取窗口内的响应时间,用总响应时间除以次数得到平均每次成功调用的响应时间。
在[1.Sentinel源码解析—FlowRuleManager加载规则做了什么?](https://www.cnblogs.com/luozhiyun/p/11439993.html)中,我已经具体讲述了StatisticNode里面的rollingCounterInMinute实现原理,rollingCounterInMinute是按分钟进行统计的时间窗口。现在我们来讲一下rollingCounterInSecond按秒来进行统计的时间窗口。
在StatisticNode里面初始化rollingCounterInSecond:
```java
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
在这个初始化的方法里,会传入两个参数,SampleCountProperty.SAMPLE_COUNT的值是2,
IntervalProperty.INTERVAL的值是1000。
我们进入到ArrayMetric的构造方法中:
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
在创建ArrayMetric实例的时候会给data创建一个OccupiableBucketLeapArray实例。
OccupiableBucketLeapArray
public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
// This class is the original "CombinedBucketArray".
super(sampleCount, intervalInMs);
this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
}
OccupiableBucketLeapArray继承LeapArray这个抽象类,初始化的时候会调用父类的构造器:
LeapArray
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
//intervalInMs是sampleCount的整数
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
//每个小窗口的时间跨度
this.windowLengthInMs = intervalInMs / sampleCount;
//窗口的长度
this.intervalInMs = intervalInMs;
//窗口个数
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
OccupiableBucketLeapArray在初始化的时候也会创建一个FutureBucketLeapArray实例赋值给borrowArray。
FutureBucketLeapArray也是继承LeapArray:
public FutureBucketLeapArray(int sampleCount, int intervalInMs) {
// This class is the original "BorrowBucketArray".
super(sampleCount, intervalInMs);
}
直接通过调用父类LeapArray的构造方法进行初始化。
到这里rollingCounterInSecond的创建过程讲完了。
下面我们再回到StatisticNode中,在调用StatisticNode的avgRt方法的时候会调用rollingCounterInSecond.success()方法获取当前时间窗口的调用成功次数:
ArrayMetric#success
public long success() {
//设置或更新当前的时间窗口
data.currentWindow();
long success = 0;
//获取窗口里有效的Bucket
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
success += window.success();
}
return success;
}
这里的data是的父类是LeapArray,LeapArray里面有一个array数组,用来记录时间窗口,在我们这里是基于秒钟的时间窗口,所以array的大小为2。
data的结构图我直接从第一篇 中拿过来:
只不过这里的WindowWrap数组元素只有两个,每一个WindowWrap元素由MetricBucket对象构成,用来统计数据,如:通过次数、阻塞次数、异常次数等~
调用data的currentWindow方法会调用到LeapArray的currentWindow方法中去:
LeapArray#currentWindow
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
//通过当前时间判断属于哪个窗口
int idx = calculateTimeIdx(timeMillis);
//计算出窗口开始时间
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);
while (true) {
//获取数组里的老数据
WindowWrap<T> old = array.get(idx);
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
// 如果对应时间窗口的开始时间与计算得到的开始时间一样
// 那么代表当前即是我们要找的窗口对象,直接返回
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
//如果当前的开始时间小于原开始时间,那么就更新到新的开始时间
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
//一般来说不会走到这里
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
这里我简单介绍一下这个方法,这个方法的详细讲解已经在第一篇源码解析里做了。
这个方法里面会根据当前的时间戳来计算出array数组里面的index,然后去array数组中找相应的数据,如果节点已经存在,那么用CAS更新一个新的节点;如果节点是新的,那么直接返回;如果节点失效了,设置当前节点,清除所有失效节点。
这里我直接引用 第一篇文章中的例子:
1. 如果array数据里面的bucket数据如下所示:
NULL B4
|_______|_______|
800 1000 1200
^
time=888
正好当前时间所对应的槽位里面的数据是空的,那么就用CAS更新
2. 如果array里面已经有数据了,并且槽位里面的窗口开始时间和当前的开始时间相等,那么直接返回
B3 B4
||_______|_______||___
800 1000 1200 timestamp
^
time=888
3. 例如当前时间是1676,所对应窗口里面的数据的窗口开始时间小于当前的窗口开始时间,那么加上锁,然后设置槽位的窗口开始时间为当前窗口开始时间,并把槽位里面的数据重置
(old)
B0
|_______||_______|
... 1200 1400
^
time=1676
再回到ArrayMetric的success方法中,往下走调用data.values()方法:
LeapArray#success
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}
这个方法就是用来获取所有有效的MetricBucket,并返回。
然后通过调用MetricBucket的success方法获取被成功调用的次数。
我们接着来看ArrayMetric的rt方法:
public long rt() {
data.currentWindow();
long rt = 0;
//获取当前时间窗口的统计数据
List<MetricBucket> list = data.values();
//统计当前时间窗口的平均相应时间之和
for (MetricBucket window : list) {
rt += window.rt();
}
return rt;
}
这个方法和上面的success方法差不多,获取所有的MetricBucket的rt数据求和返回。
然后就可以通过rt方法返回的时间总和除以成功调用的总和求得平均数。
我们再回到DegradeRule的passCheck方法中的响应时间降级策略中:
if (grade == RuleConstant.DEGRADE_GRADE_RT) {
//获取节点的平均响应时间
double rt = clusterNode.avgRt();
if (rt < this.count) {
passCount.set(0);
return true;
}
//rtSlowRequestAmount默认是5
// Sentinel will degrade the service only if count exceeds.
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
// 根据异常比例降级
}
//省略
return false;
如果求得的平均响应时间小于设置的count时间,那么就重置passCount并返回true,表示不抛出异常;如果有连续5次的响应时间都超过了count,那么就返回false抛出异常进行降级。
if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
//获取每秒异常的次数
double exception = clusterNode.exceptionQps();
//获取每秒成功的次数
double success = clusterNode.successQps();
//获取每秒总调用次数
double total = clusterNode.totalQps();
// If total amount is less than minRequestAmount, the request will pass.
// 如果总调用次数少于5,那么不进行降级
if (total < minRequestAmount) {
return true;
}
// In the same aligned statistic time window,
// "success" (aka. completed count) = exception count + non-exception count (realSuccess)
double realSuccess = success - exception;
if (realSuccess <= 0 && exception < minRequestAmount) {
return true;
}
if (exception / success < count) {
return true;
}
}
。。。
return false;
这个方法中获取成功调用的Qps和异常调用的Qps,验证后,然后求一下比率,如果没有大于count,那么就返回true,否则返回false抛出异常。
我们再进入到exceptionQps方法中看一下:
StatisticNode#exceptionQps
public double exceptionQps() {
return rollingCounterInSecond.exception() / rollingCounterInSecond.getWindowIntervalInSec();
}
rollingCounterInSecond.getWindowIntervalInSec方法是表示窗口的时间长度,用秒来表示。这里返回的是1。
ArrayMetric#exception
public long exception() {
data.currentWindow();
long exception = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
exception += window.exception();
}
return exception;
}
这个方法和我上面分析的差不多,大家看看就好了。
if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
double exception = clusterNode.totalException();
if (exception < count) {
return true;
}
}
根据异常数降级是非常的直接的,直接根据统计的异常总次数判断是否超过count。
到这里就讲完了降级的实现咯~~
这也不是一篇非常详细的源码领读,源码细节还需你自己仔细咀嚼 这只是我在改了些sentinel bug后,梳理的脉络,主要是脉络。看完后对Sentinel的源码模块划分和大致交互有个整体印象。 从而
还有其他类似的问题,但我想将其归结为最基本的问题。 我正在运行一个 .NET 应用程序 (C#) 并尝试连接并监视一组运行哨兵 ( 3x 哨兵监控 1 个主站和 2 个从站)。它们在 linux 机器
我们有 2 个运行 HA 应用程序的应用程序/Web 服务器,我们需要设置具有高可用性/复制功能的 Redis 以支持我们的应用程序。 考虑到 3 个节点的最低哨兵设置要求。 我们计划准备第一个应用程
我想知道是否可以将 Azure AD 用户配置文件详细信息(使用位置、国家或地区、办公室)获取到 azure Sentinel 日志中? Kusto 查询会是什么? 最佳答案 此功能将作为 Azure
Sentinel Dashboard限流规则保存 sentinel在限流规则配置方面提供了可视化页面 sentinel dashboard,源码可从github下载,请自行搜索,此处不提供下载链接
概念 何为热点?热点即经常访问的数据。很多时候我们希望统计某个热点数据中访问频次最高的 Top K 数据,并对其访问进行限制。比如: 商品 ID 为参数,统计一段时间内最常购买的商品 ID 并进行限制
概述 除了流量控制以外,对调用链路中不稳定的资源进行熔断降级也是保障高可用的重要措施之一。由于调用关系的复杂性,如果调用链路中的某个资源不稳定,最终会导致请求发生堆积。Sentinel 熔断降级会在调
前言 在上一节中我们知道Sentinel 支持以下几种规则:流量控制规则、熔断降级规则、系统保护规则、来源访问控制规则 和 热点参数规则。 Sentinel 的所有规则都可以在内存态中动态地查询及修改
在之前使用Nacos持久化规则的文档中,最后发现只能使用Nacos推送配置到控制台,那么怎么实现控制台和Nacos的双向同步呢? 这里不直接提供解决方案,我们还是先分析下控制台的源码。 下面我们分
化整为零 我们已经知道了Slot是从第一个往后一直传递到最后一个的,且当信息传递到StatisticSlot时,这里就开始进行统计了,统计的结果又会被后续的Slot所采用,作为规则校验的依据。我们先
Sentinel简介 Sentinel 是阿里中间件团队开源的,面向分布式服务架构的轻量级高可用流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助用户保护服务的稳定
之前遗留问题 之前我们集成了Nacos,实现了推送规则到客户端,但是控制台添加的规则不能持久化到Naocs中,这时需要对控制台源码进行改造。 先来回顾下默认添加流控规则的执行流程: 1、 控
Sentinel 中有很多比较重要的概念,我们要了解一个框架,首先要对框架中重要的概念实体进行分析,本文我将跟大家一起来分析一下 Sentinel 中非常重要的几个概念。 Resource Res
通过sentinel 的控制台,我们可以对规则进行查询和修改,也可以查看到实时监控,机器列表等信息,所以我们需要对 sentinel 的控制台做个完整的了解。 启动控制台 从github上下载源码
随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式服务架构的流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统自适应保护等多个维度来帮助您保障微服务的稳定
控制台 控制台主要的处理类是 FlowControllerV1 。 @RestController @RequestMapping(value = "/v1/flow") pu
项目结构 将Sentinel的源码fork到自己的github库中,接着把源码clone到本地,然后开始源码阅读之旅吧。 首先我们看一下Sentinel项目的整个结构: sentine
雪崩效应 雪崩 一种自然现象,当山坡积雪内部的内聚力抗拒不了它所受到的重力拉引时,便向下滑动,引起大量雪体崩塌。 雪崩效应广泛应用于各种领域,比如密码学术语、管理学、商业等。 服务雪崩 在软件
实时监控 集成控制台后,当有请求时,实时监控页面会显示当前服务各个接口的访问信息,以图表的形式展示给用户,包含访问时间、通过 QPS、拒绝QPS、响应时间(ms)等信息。 簇点链路 列表:用
实时监控 Sentinel 提供对所有资源的实时监控,我们可以通过控制台查看。 可以很清楚的看到当前QPS流量趋势。 监控API 簇点监控 获取簇点列表 API: GET /cluste
我是一名优秀的程序员,十分优秀!