- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
在之前使用Nacos持久化规则的文档中,最后发现只能使用Nacos推送配置到控制台,那么怎么实现控制台和Nacos的双向同步呢?
这里不直接提供解决方案,我们还是先分析下控制台的源码。
下面我们分析下添加、查询流控规则的源码及流程。
首先分析下用到的相关类
RuleEntity接口,其实现类就是这些规则对应的实体类了。
重点看下FlowRuleEntity源码:
public class FlowRuleEntity implements RuleEntity {
// 主键ID
private Long id;
// 后台应用名(客户端)
private String app;
// 后台应用IP
private String ip;
// 后台和控制台通信的端口 8719
private Integer port;
// 针对来源
private String limitApp;
// 资源名
private String resource;
/**
* 阈值类型
* 0为线程数;1为qps
*/
private Integer grade;
// 单机阈值
private Double count;
/**
* 流控模式
* 0为直接限流;1为关联限流;2为链路限流
*/
private Integer strategy;
// 关联限流时的关联资源
private String refResource;
/**
* 流控效果 快速失败 Warm Up 排队等待
* 0. default, 1. warm up, 2. rate limiter
*/
private Integer controlBehavior;
// warm up模式 预热时长
private Integer warmUpPeriodSec;
/**
* 速率限制器行为中的最大排队时间
*/
private Integer maxQueueingTimeMs;
// 是否集群
private boolean clusterMode;
/**
* 集群模式的流规则配置
*/
private ClusterFlowConfig clusterConfig;
// 创建时间
private Date gmtCreate;
// 修改时间
private Date gmtModified;
/**
* FlowRule=>FlowRuleEntity
*/
public static FlowRuleEntity fromFlowRule(String app, String ip, Integer port, FlowRule rule) {
// 省略.....
}
/**
* 实体类转为FlowRule
*/
@Override
public FlowRule toRule() {
// 省略.....
}
}
可以看到FlowRuleEntity就对应了界面中新增流控规则界面了。。
RuleRepository是存储和查询规则的顶级接口,添加了增加、删除、查询规则的一系列方法。
public interface RuleRepository<T, ID> {
T save(T entity);
List<T> saveAll(List<T> rules);
T delete(ID id);
T findById(ID id);
List<T> findAllByMachine(MachineInfo machineInfo);
List<T> findAllByApp(String appName);
}
规则存储针对每种规则,都有对应的实现类,其抽象类InMemoryRuleRepositoryAdapter表示将规则存储在内存中,也是框架提供了唯一一个存储方式。
我们重点看下规则保存接口,这里会将所有规则保存到ConcurrentHashMap中。
@Override
public T save(T entity) {
// 1. 设置ID
if (entity.getId() == null) {
entity.setId(nextId());
}
// 2. 调用子类处理实体类
T processedEntity = preProcess(entity);
if (processedEntity != null) {
// 3. 将规则添加到ConcurrentHashMap,ID为KEY,规则为Value
allRules.put(processedEntity.getId(), processedEntity);
// 4. 将规则添加到ConcurrentHashMap,MachineInfo为KEY,所有的规则为Value
machineRules.computeIfAbsent(MachineInfo.of(processedEntity.getApp(), processedEntity.getIp(),
processedEntity.getPort()), e -> new ConcurrentHashMap<>(32))
.put(processedEntity.getId(), processedEntity);
// 5. 将规则添加到ConcurrentHashMap,后台应用名为KEY,所有的规则为Value
appRules.computeIfAbsent(processedEntity.getApp(), v -> new ConcurrentHashMap<>(32))
.put(processedEntity.getId(), processedEntity);
}
return processedEntity;
}
SentinelApiClient类主要负责与 Sentinel 客户端通信,会发送HTTP调用客户端的API接口进行数据交互。
定义了很多常量,大部分都是API路径。
private static final String RESOURCE_URL_PATH = "jsonTree";
private static final String CLUSTER_NODE_PATH = "clusterNode";
private static final String GET_RULES_PATH = "getRules";
private static final String SET_RULES_PATH = "setRules";
private static final String GET_PARAM_RULE_PATH = "getParamFlowRules";
private static final String SET_PARAM_RULE_PATH = "setParamFlowRules";
private static final String FETCH_CLUSTER_MODE_PATH = "getClusterMode";
private static final String MODIFY_CLUSTER_MODE_PATH = "setClusterMode";
private static final String FETCH_CLUSTER_CLIENT_CONFIG_PATH = "cluster/client/fetchConfig";
private static final String MODIFY_CLUSTER_CLIENT_CONFIG_PATH = "cluster/client/modifyConfig";
private static final String FETCH_CLUSTER_SERVER_BASIC_INFO_PATH = "cluster/server/info";
private static final String MODIFY_CLUSTER_SERVER_TRANSPORT_CONFIG_PATH = "cluster/server/modifyTransportConfig";
private static final String MODIFY_CLUSTER_SERVER_FLOW_CONFIG_PATH = "cluster/server/modifyFlowConfig";
private static final String MODIFY_CLUSTER_SERVER_NAMESPACE_SET_PATH = "cluster/server/modifyNamespaceSet";
private static final String FETCH_GATEWAY_API_PATH = "gateway/getApiDefinitions";
private static final String MODIFY_GATEWAY_API_PATH = "gateway/updateApiDefinitions";
private static final String FETCH_GATEWAY_FLOW_RULE_PATH = "gateway/getRules";
private static final String MODIFY_GATEWAY_FLOW_RULE_PATH = "gateway/updateRules";
private static final String FLOW_RULE_TYPE = "flow";
private static final String DEGRADE_RULE_TYPE = "degrade";
private static final String SYSTEM_RULE_TYPE = "system";
private static final String AUTHORITY_TYPE = "authority";
接下来看下SentinelApiClient中的setRulesAsync方法,它的作用主要是异步请求客户端设置规则。
/**
* 异步请求客户端设置规则
* @param app 应用名
* @param ip 应用IP
* @param port 通信端口
* @param type 规则类型
* @param entities 规则
* @return
*/
private CompletableFuture<Void> setRulesAsync(String app, String ip, int port, String type, List<? extends RuleEntity> entities) {
try {
// 1. 检查参数
AssertUtil.notNull(entities, "rules cannot be null");
AssertUtil.notEmpty(app, "Bad app name");
AssertUtil.notEmpty(ip, "Bad machine IP");
AssertUtil.isTrue(port > 0, "Bad machine port");
// 2. 规则集合转为Json
String data = JSON.toJSONString(
entities.stream().map(r -> r.toRule()).collect(Collectors.toList()));
Map<String, String> params = new HashMap<>(2);
// 3. 设置请求参数
params.put("type", type);
params.put("data", data);
// 4. 发送请求
return executeCommand(app, ip, port, SET_RULES_PATH, params, true)
.thenCompose(r -> {
if ("success".equalsIgnoreCase(r.trim())) {
return CompletableFuture.completedFuture(null);
}
return AsyncUtils.newFailedFuture(new CommandFailedException(r));
});
} catch (Exception e) {
logger.error("setRulesAsync API failed, type={}", type, e);
return AsyncUtils.newFailedFuture(e);
}
}
接下来看下SentinelApiClient中的executeCommand方法,它的作用就是执行请求了。
private CompletableFuture<String> executeCommand(String app, String ip, int port, String api, Map<String, String> params, boolean useHttpPost) {
// 1. 拼接请求URL http://192.168.1.20:8721/setRules
CompletableFuture<String> future = new CompletableFuture<>();
if (StringUtil.isBlank(ip) || StringUtil.isBlank(api)) {
future.completeExceptionally(new IllegalArgumentException("Bad URL or command name"));
return future;
}
StringBuilder urlBuilder = new StringBuilder();
urlBuilder.append("http://");
urlBuilder.append(ip).append(':').append(port).append('/').append(api);
if (params == null) {
params = Collections.emptyMap();
}
// 2. 执行GET请求,参数拼在URL后面
if (!useHttpPost || !isSupportPost(app, ip, port)) {
// Using GET in older versions, append parameters after url
if (!params.isEmpty()) {
if (urlBuilder.indexOf("?") == -1) {
urlBuilder.append('?');
} else {
urlBuilder.append('&');
}
urlBuilder.append(queryString(params));
}
return executeCommand(new HttpGet(urlBuilder.toString()));
} else {
// Using POST
// 3. 执行POST请求
return executeCommand(
postRequest(urlBuilder.toString(), params, isSupportEnhancedContentType(app, ip, port)));
}
}
最终请求会使用apache提供了httpClient执行请求,获取返回结果。
首先在页面中添加一个流控规则,并F12打开开发者模式,对/app/get进行限流。
点击新增按钮,发送请求,我们看到是访问的/v1/flow/rule,然后注意下请求参数。
上面的访问路径,对应的就是FlowControllerV1中的apiAddFlowRule控制器了,这是一个Spring MVC 接口。
这个接口主要是保存了规则在控制台的内存中,然后又调用了客户端的API,将规则发送给了客户端应用,具体怎么执行了,之前的核心类源码SentinelApiClient已经分析过了。
@PostMapping("/rule")
@AuthAction(PrivilegeType.WRITE_RULE)
public Result<FlowRuleEntity> apiAddFlowRule(@RequestBody FlowRuleEntity entity) {
// 1. 参数校验
Result<FlowRuleEntity> checkResult = checkEntityInternal(entity);
if (checkResult != null) {
return checkResult;
}
// 2. 设置附加参数
entity.setId(null);
Date date = new Date();
entity.setGmtCreate(date);
entity.setGmtModified(date);
entity.setLimitApp(entity.getLimitApp().trim());
entity.setResource(entity.getResource().trim());
try {
// 3. 保存流控规则,默认在内存,InMemoryRuleRepositoryAdapter
entity = repository.save(entity);
// http://192.168.1.20:8721/setRules
// 4. 调用客户端的API重新设置规则 SentinelApiClient
publishRules(entity.getApp(), entity.getIp(), entity.getPort()).get(5000, TimeUnit.MILLISECONDS);
return Result.ofSuccess(entity);
} catch (Throwable t) {
Throwable e = t instanceof ExecutionException ? t.getCause() : t;
logger.error("Failed to add new flow rule, app={}, ip={}", entity.getApp(), entity.getIp(), e);
return Result.ofFail(-1, e.getMessage());
}
}
第二步中控制台调用了客户端的setRules接口,接下来我们看下客户端这个接口都做了什么。
setRules接口进入的是ModifyRulesCommandHandler处理器进行处理,其handle方法,主要是接受请求,然后根据不同的规则类型的管理器进行处理。
public CommandResponse<String> handle(CommandRequest request) {
// 1. XXX from 1.7.2, 当 fastjson 早于 1.2.12 时强制失败
if (VersionUtil.fromVersionString(JSON.VERSION) < FASTJSON_MINIMAL_VER) {
// fastjson版本太低
return CommandResponse.ofFailure(new RuntimeException("The \"fastjson-" + JSON.VERSION
+ "\" introduced in application is too old, you need fastjson-1.2.12 at least."));
}
// 2. 获取请求参数
String type = request.getParam("type");
String data = request.getParam("data");
if (StringUtil.isNotEmpty(data)) {
try {
data = URLDecoder.decode(data, "utf-8");
} catch (Exception e) {
RecordLog.info("Decode rule data error", e);
return CommandResponse.ofFailure(e, "decode rule data error");
}
}
RecordLog.info("Receiving rule change (type: {}): {}", type, data);
String result = "success";
// 3. 判断规则类型
if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
// 流控规则 解析参数为流控规则对象集合
List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
// 调用流量规则管理器加载规则 返回结果
FlowRuleManager.loadRules(flowRules);
if (!writeToDataSource(getFlowDataSource(), flowRules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {
List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class);
AuthorityRuleManager.loadRules(rules);
if (!writeToDataSource(getAuthorityDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {
List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class);
DegradeRuleManager.loadRules(rules);
if (!writeToDataSource(getDegradeDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {
List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class);
SystemRuleManager.loadRules(rules);
if (!writeToDataSource(getSystemSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
}
return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
}
流控规则调用的是FlowRuleManager,其loadRules方法最终调用的就是DynamicSentinelProperty的updateValue方法。
可以看到DynamicSentinelProperty维护了之前流控规则,并接受了新的流控规则。
之后会调用管理器中的监听器并循环。
最终调用监听器的configUpdate方法,更新规则管理器中存放规则的ConcurrentHashMap,这样客户端的内存中流控规则也就更新了。
F12可以看到访问的是/v1/flow/rules接口。
/v1/flow/rules接口的逻辑处理如下:
@GetMapping("/rules")
@AuthAction(PrivilegeType.READ_RULE)
public Result<List<FlowRuleEntity>> apiQueryMachineRules(@RequestParam String app,
@RequestParam String ip,
@RequestParam Integer port) {
if (StringUtil.isEmpty(app)) {
return Result.ofFail(-1, "app can't be null or empty");
}
if (StringUtil.isEmpty(ip)) {
return Result.ofFail(-1, "ip can't be null or empty");
}
if (port == null) {
return Result.ofFail(-1, "port can't be null");
}
try {
// 1. 调用客户端API,查询规则 http://192.168.1.20:8721/getRules
List<FlowRuleEntity> rules = sentinelApiClient.fetchFlowRuleOfMachine(app, ip, port);
// 2. 将客户端查询到的规则 重新存放到控制台中,会事先清理控制台内存中的规则
rules = repository.saveAll(rules);
return Result.ofSuccess(rules);
} catch (Throwable throwable) {
logger.error("Error when querying flow rules", throwable);
return Result.ofThrowable(-1, throwable);
}
}
控制台发出getRules请求后,是交给FetchActiveRuleCommandHandler处理器来进行处理。
@Override
public CommandResponse<String> handle(CommandRequest request) {
String type = request.getParam("type");
if ("flow".equalsIgnoreCase(type)) {
// 调用管理器获取规则
return CommandResponse.ofSuccess(JSON.toJSONString(FlowRuleManager.getRules()));
} else if ("degrade".equalsIgnoreCase(type)) {
return CommandResponse.ofSuccess(JSON.toJSONString(DegradeRuleManager.getRules()));
} else if ("authority".equalsIgnoreCase(type)) {
return CommandResponse.ofSuccess(JSON.toJSONString(AuthorityRuleManager.getRules()));
} else if ("system".equalsIgnoreCase(type)) {
return CommandResponse.ofSuccess(JSON.toJSONString(SystemRuleManager.getRules()));
} else {
return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
}
}
在管理器中会将内存中的规则返回给控制台。
public static List<FlowRule> getRules() {
List<FlowRule> rules = new ArrayList<FlowRule>();
for (Map.Entry<String, List<FlowRule>> entry : flowRules.entrySet()) {
rules.addAll(entry.getValue());
}
return rules;
}
我创建了一个用户可以添加测试的字段。这一切运行顺利我只希望当用户点击(添加另一个测试)然后上一个(添加另一个测试)删除并且这个显示在新字段中。 所有运行良好的唯一问题是点击(添加另一个字段)之前添加另
String[] option = {"Adlawan", "Angeles", "Arreza", "Benenoso", "Bermas", "Brebant
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 这个问题似乎不是关于 a specific programming problem, a softwar
我正在努力将 jQuery 滚动功能添加到 nav-tab (Bootstrap 3)。我希望用户能够选择他们想要的选项卡,并在选项卡内容中有一个可以平滑滚动到 anchor 的链接。这是我的代码,可
我正在尝试在用户登录后再添加 2 个 ui 选项卡。首先,我尝试做一个之后。 $('#slideshow').tabs('remove', '4'); $("#slideshow ul li:last
我有一个包含选择元素的表单,我想通过选择添加和删除其中一些元素。这是html代码(这里也有jsfiddle http://jsfiddle.net/txhajy2w/):
正在写这个: view.backgroundColor = UIColor.white.withAlphaComponent(0.9) 等同于: view.backgroundColor = UICo
好的,如果其中有任何信息,我想将这些列添加到一起。所以说我有 账户 1 2 3 . 有 4 个帐户空间,但只有 3 个帐户。我如何创建 java 脚本来添加它。 最佳答案 Live Example H
我想知道是否有一种有效的预制算法来确定一组数字的和/差是否可以等于不同的数字。示例: 5、8、10、2,使用 + 或 - 等于 9。5 - 8 = -3 + 10 = 7 + 2 = 9 如果有一个预
我似乎有一个卡住的 git repo。它卡在所有基本的添加、提交命令上,git push 返回所有内容为最新的。 从其他帖子我已经完成了 git gc 和 git fsck/ 我认为基本的调试步骤是
我的 Oracle SQL 查询如下- Q1- select hca.account_number, hca.attribute3, SUM(rcl.extended_amou
我正在阅读 http://developer.apple.com/iphone/library/documentation/iPhone/Conceptual/iPhoneOSProgrammingG
我正在尝试添加一个“加载更多”按钮并限制下面的结果,这样投资组合页面中就不会同时加载 1000 个内容,如下所示:http://typesetdesign.com/portfolio/ 我对 PHP
我遇到这个问题,我添加了 8 个文本框,它工作正常,但是当我添加更多文本框(如 16 个文本框)时,它不会添加最后一个文本框。有人遇到过这个问题吗?提前致谢。 Live Link: JAVASCRIP
add/remove clone first row default not delete 添加/删除克隆第一行默认不删除&并获取正确的SrNo(例如:添加3行并在看到问题后删除SrNo.2)
我编码this ,但删除按钮不起作用。我在控制台中没有任何错误.. var counter = 0; var dataList = document.getElementById('materi
我有一个类似数组的对象: [1:数组[10]、2:数组[2]、3:数组[2]、4:数组[2]、5:数组[3]、6:数组[1]] 我正在尝试删除前两个元素,执行一些操作,然后将它们再次插入到同一位置。
使用的 Delphi 版本:2007 你好, 我有一个 Tecord 数组 TInfo = Record Name : String; Price : Integer; end; var Info
我使用了基本的 gridster 代码,然后我声明了通过按钮添加和删除小部件的函数它工作正常但是当我将调整大小功能添加到上面的代码中时,它都不起作用(我的意思是调整大小,添加和删除小部件) 我的js代
title 323 323 323 title 323 323 323 title 323 323 323 JS $(document).keydown(function(e){
我是一名优秀的程序员,十分优秀!