- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
在之前使用Nacos持久化规则的文档中,最后发现只能使用Nacos推送配置到控制台,那么怎么实现控制台和Nacos的双向同步呢?
这里不直接提供解决方案,我们还是先分析下控制台的源码。
下面我们分析下添加、查询流控规则的源码及流程。
首先分析下用到的相关类
RuleEntity接口,其实现类就是这些规则对应的实体类了。
重点看下FlowRuleEntity源码:
public class FlowRuleEntity implements RuleEntity {
// 主键ID
private Long id;
// 后台应用名(客户端)
private String app;
// 后台应用IP
private String ip;
// 后台和控制台通信的端口 8719
private Integer port;
// 针对来源
private String limitApp;
// 资源名
private String resource;
/**
* 阈值类型
* 0为线程数;1为qps
*/
private Integer grade;
// 单机阈值
private Double count;
/**
* 流控模式
* 0为直接限流;1为关联限流;2为链路限流
*/
private Integer strategy;
// 关联限流时的关联资源
private String refResource;
/**
* 流控效果 快速失败 Warm Up 排队等待
* 0. default, 1. warm up, 2. rate limiter
*/
private Integer controlBehavior;
// warm up模式 预热时长
private Integer warmUpPeriodSec;
/**
* 速率限制器行为中的最大排队时间
*/
private Integer maxQueueingTimeMs;
// 是否集群
private boolean clusterMode;
/**
* 集群模式的流规则配置
*/
private ClusterFlowConfig clusterConfig;
// 创建时间
private Date gmtCreate;
// 修改时间
private Date gmtModified;
/**
* FlowRule=>FlowRuleEntity
*/
public static FlowRuleEntity fromFlowRule(String app, String ip, Integer port, FlowRule rule) {
// 省略.....
}
/**
* 实体类转为FlowRule
*/
@Override
public FlowRule toRule() {
// 省略.....
}
}
可以看到FlowRuleEntity就对应了界面中新增流控规则界面了。。
RuleRepository是存储和查询规则的顶级接口,添加了增加、删除、查询规则的一系列方法。
public interface RuleRepository<T, ID> {
T save(T entity);
List<T> saveAll(List<T> rules);
T delete(ID id);
T findById(ID id);
List<T> findAllByMachine(MachineInfo machineInfo);
List<T> findAllByApp(String appName);
}
规则存储针对每种规则,都有对应的实现类,其抽象类InMemoryRuleRepositoryAdapter表示将规则存储在内存中,也是框架提供了唯一一个存储方式。
我们重点看下规则保存接口,这里会将所有规则保存到ConcurrentHashMap中。
@Override
public T save(T entity) {
// 1. 设置ID
if (entity.getId() == null) {
entity.setId(nextId());
}
// 2. 调用子类处理实体类
T processedEntity = preProcess(entity);
if (processedEntity != null) {
// 3. 将规则添加到ConcurrentHashMap,ID为KEY,规则为Value
allRules.put(processedEntity.getId(), processedEntity);
// 4. 将规则添加到ConcurrentHashMap,MachineInfo为KEY,所有的规则为Value
machineRules.computeIfAbsent(MachineInfo.of(processedEntity.getApp(), processedEntity.getIp(),
processedEntity.getPort()), e -> new ConcurrentHashMap<>(32))
.put(processedEntity.getId(), processedEntity);
// 5. 将规则添加到ConcurrentHashMap,后台应用名为KEY,所有的规则为Value
appRules.computeIfAbsent(processedEntity.getApp(), v -> new ConcurrentHashMap<>(32))
.put(processedEntity.getId(), processedEntity);
}
return processedEntity;
}
SentinelApiClient类主要负责与 Sentinel 客户端通信,会发送HTTP调用客户端的API接口进行数据交互。
定义了很多常量,大部分都是API路径。
private static final String RESOURCE_URL_PATH = "jsonTree";
private static final String CLUSTER_NODE_PATH = "clusterNode";
private static final String GET_RULES_PATH = "getRules";
private static final String SET_RULES_PATH = "setRules";
private static final String GET_PARAM_RULE_PATH = "getParamFlowRules";
private static final String SET_PARAM_RULE_PATH = "setParamFlowRules";
private static final String FETCH_CLUSTER_MODE_PATH = "getClusterMode";
private static final String MODIFY_CLUSTER_MODE_PATH = "setClusterMode";
private static final String FETCH_CLUSTER_CLIENT_CONFIG_PATH = "cluster/client/fetchConfig";
private static final String MODIFY_CLUSTER_CLIENT_CONFIG_PATH = "cluster/client/modifyConfig";
private static final String FETCH_CLUSTER_SERVER_BASIC_INFO_PATH = "cluster/server/info";
private static final String MODIFY_CLUSTER_SERVER_TRANSPORT_CONFIG_PATH = "cluster/server/modifyTransportConfig";
private static final String MODIFY_CLUSTER_SERVER_FLOW_CONFIG_PATH = "cluster/server/modifyFlowConfig";
private static final String MODIFY_CLUSTER_SERVER_NAMESPACE_SET_PATH = "cluster/server/modifyNamespaceSet";
private static final String FETCH_GATEWAY_API_PATH = "gateway/getApiDefinitions";
private static final String MODIFY_GATEWAY_API_PATH = "gateway/updateApiDefinitions";
private static final String FETCH_GATEWAY_FLOW_RULE_PATH = "gateway/getRules";
private static final String MODIFY_GATEWAY_FLOW_RULE_PATH = "gateway/updateRules";
private static final String FLOW_RULE_TYPE = "flow";
private static final String DEGRADE_RULE_TYPE = "degrade";
private static final String SYSTEM_RULE_TYPE = "system";
private static final String AUTHORITY_TYPE = "authority";
接下来看下SentinelApiClient中的setRulesAsync方法,它的作用主要是异步请求客户端设置规则。
/**
* 异步请求客户端设置规则
* @param app 应用名
* @param ip 应用IP
* @param port 通信端口
* @param type 规则类型
* @param entities 规则
* @return
*/
private CompletableFuture<Void> setRulesAsync(String app, String ip, int port, String type, List<? extends RuleEntity> entities) {
try {
// 1. 检查参数
AssertUtil.notNull(entities, "rules cannot be null");
AssertUtil.notEmpty(app, "Bad app name");
AssertUtil.notEmpty(ip, "Bad machine IP");
AssertUtil.isTrue(port > 0, "Bad machine port");
// 2. 规则集合转为Json
String data = JSON.toJSONString(
entities.stream().map(r -> r.toRule()).collect(Collectors.toList()));
Map<String, String> params = new HashMap<>(2);
// 3. 设置请求参数
params.put("type", type);
params.put("data", data);
// 4. 发送请求
return executeCommand(app, ip, port, SET_RULES_PATH, params, true)
.thenCompose(r -> {
if ("success".equalsIgnoreCase(r.trim())) {
return CompletableFuture.completedFuture(null);
}
return AsyncUtils.newFailedFuture(new CommandFailedException(r));
});
} catch (Exception e) {
logger.error("setRulesAsync API failed, type={}", type, e);
return AsyncUtils.newFailedFuture(e);
}
}
接下来看下SentinelApiClient中的executeCommand方法,它的作用就是执行请求了。
private CompletableFuture<String> executeCommand(String app, String ip, int port, String api, Map<String, String> params, boolean useHttpPost) {
// 1. 拼接请求URL http://192.168.1.20:8721/setRules
CompletableFuture<String> future = new CompletableFuture<>();
if (StringUtil.isBlank(ip) || StringUtil.isBlank(api)) {
future.completeExceptionally(new IllegalArgumentException("Bad URL or command name"));
return future;
}
StringBuilder urlBuilder = new StringBuilder();
urlBuilder.append("http://");
urlBuilder.append(ip).append(':').append(port).append('/').append(api);
if (params == null) {
params = Collections.emptyMap();
}
// 2. 执行GET请求,参数拼在URL后面
if (!useHttpPost || !isSupportPost(app, ip, port)) {
// Using GET in older versions, append parameters after url
if (!params.isEmpty()) {
if (urlBuilder.indexOf("?") == -1) {
urlBuilder.append('?');
} else {
urlBuilder.append('&');
}
urlBuilder.append(queryString(params));
}
return executeCommand(new HttpGet(urlBuilder.toString()));
} else {
// Using POST
// 3. 执行POST请求
return executeCommand(
postRequest(urlBuilder.toString(), params, isSupportEnhancedContentType(app, ip, port)));
}
}
最终请求会使用apache提供了httpClient执行请求,获取返回结果。
首先在页面中添加一个流控规则,并F12打开开发者模式,对/app/get进行限流。
点击新增按钮,发送请求,我们看到是访问的/v1/flow/rule,然后注意下请求参数。
上面的访问路径,对应的就是FlowControllerV1中的apiAddFlowRule控制器了,这是一个Spring MVC 接口。
这个接口主要是保存了规则在控制台的内存中,然后又调用了客户端的API,将规则发送给了客户端应用,具体怎么执行了,之前的核心类源码SentinelApiClient已经分析过了。
@PostMapping("/rule")
@AuthAction(PrivilegeType.WRITE_RULE)
public Result<FlowRuleEntity> apiAddFlowRule(@RequestBody FlowRuleEntity entity) {
// 1. 参数校验
Result<FlowRuleEntity> checkResult = checkEntityInternal(entity);
if (checkResult != null) {
return checkResult;
}
// 2. 设置附加参数
entity.setId(null);
Date date = new Date();
entity.setGmtCreate(date);
entity.setGmtModified(date);
entity.setLimitApp(entity.getLimitApp().trim());
entity.setResource(entity.getResource().trim());
try {
// 3. 保存流控规则,默认在内存,InMemoryRuleRepositoryAdapter
entity = repository.save(entity);
// http://192.168.1.20:8721/setRules
// 4. 调用客户端的API重新设置规则 SentinelApiClient
publishRules(entity.getApp(), entity.getIp(), entity.getPort()).get(5000, TimeUnit.MILLISECONDS);
return Result.ofSuccess(entity);
} catch (Throwable t) {
Throwable e = t instanceof ExecutionException ? t.getCause() : t;
logger.error("Failed to add new flow rule, app={}, ip={}", entity.getApp(), entity.getIp(), e);
return Result.ofFail(-1, e.getMessage());
}
}
第二步中控制台调用了客户端的setRules接口,接下来我们看下客户端这个接口都做了什么。
setRules接口进入的是ModifyRulesCommandHandler处理器进行处理,其handle方法,主要是接受请求,然后根据不同的规则类型的管理器进行处理。
public CommandResponse<String> handle(CommandRequest request) {
// 1. XXX from 1.7.2, 当 fastjson 早于 1.2.12 时强制失败
if (VersionUtil.fromVersionString(JSON.VERSION) < FASTJSON_MINIMAL_VER) {
// fastjson版本太低
return CommandResponse.ofFailure(new RuntimeException("The \"fastjson-" + JSON.VERSION
+ "\" introduced in application is too old, you need fastjson-1.2.12 at least."));
}
// 2. 获取请求参数
String type = request.getParam("type");
String data = request.getParam("data");
if (StringUtil.isNotEmpty(data)) {
try {
data = URLDecoder.decode(data, "utf-8");
} catch (Exception e) {
RecordLog.info("Decode rule data error", e);
return CommandResponse.ofFailure(e, "decode rule data error");
}
}
RecordLog.info("Receiving rule change (type: {}): {}", type, data);
String result = "success";
// 3. 判断规则类型
if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
// 流控规则 解析参数为流控规则对象集合
List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
// 调用流量规则管理器加载规则 返回结果
FlowRuleManager.loadRules(flowRules);
if (!writeToDataSource(getFlowDataSource(), flowRules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {
List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class);
AuthorityRuleManager.loadRules(rules);
if (!writeToDataSource(getAuthorityDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {
List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class);
DegradeRuleManager.loadRules(rules);
if (!writeToDataSource(getDegradeDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {
List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class);
SystemRuleManager.loadRules(rules);
if (!writeToDataSource(getSystemSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
}
return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
}
流控规则调用的是FlowRuleManager,其loadRules方法最终调用的就是DynamicSentinelProperty的updateValue方法。
可以看到DynamicSentinelProperty维护了之前流控规则,并接受了新的流控规则。
之后会调用管理器中的监听器并循环。
最终调用监听器的configUpdate方法,更新规则管理器中存放规则的ConcurrentHashMap,这样客户端的内存中流控规则也就更新了。
F12可以看到访问的是/v1/flow/rules接口。
/v1/flow/rules接口的逻辑处理如下:
@GetMapping("/rules")
@AuthAction(PrivilegeType.READ_RULE)
public Result<List<FlowRuleEntity>> apiQueryMachineRules(@RequestParam String app,
@RequestParam String ip,
@RequestParam Integer port) {
if (StringUtil.isEmpty(app)) {
return Result.ofFail(-1, "app can't be null or empty");
}
if (StringUtil.isEmpty(ip)) {
return Result.ofFail(-1, "ip can't be null or empty");
}
if (port == null) {
return Result.ofFail(-1, "port can't be null");
}
try {
// 1. 调用客户端API,查询规则 http://192.168.1.20:8721/getRules
List<FlowRuleEntity> rules = sentinelApiClient.fetchFlowRuleOfMachine(app, ip, port);
// 2. 将客户端查询到的规则 重新存放到控制台中,会事先清理控制台内存中的规则
rules = repository.saveAll(rules);
return Result.ofSuccess(rules);
} catch (Throwable throwable) {
logger.error("Error when querying flow rules", throwable);
return Result.ofThrowable(-1, throwable);
}
}
控制台发出getRules请求后,是交给FetchActiveRuleCommandHandler处理器来进行处理。
@Override
public CommandResponse<String> handle(CommandRequest request) {
String type = request.getParam("type");
if ("flow".equalsIgnoreCase(type)) {
// 调用管理器获取规则
return CommandResponse.ofSuccess(JSON.toJSONString(FlowRuleManager.getRules()));
} else if ("degrade".equalsIgnoreCase(type)) {
return CommandResponse.ofSuccess(JSON.toJSONString(DegradeRuleManager.getRules()));
} else if ("authority".equalsIgnoreCase(type)) {
return CommandResponse.ofSuccess(JSON.toJSONString(AuthorityRuleManager.getRules()));
} else if ("system".equalsIgnoreCase(type)) {
return CommandResponse.ofSuccess(JSON.toJSONString(SystemRuleManager.getRules()));
} else {
return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
}
}
在管理器中会将内存中的规则返回给控制台。
public static List<FlowRule> getRules() {
List<FlowRule> rules = new ArrayList<FlowRule>();
for (Map.Entry<String, List<FlowRule>> entry : flowRules.entrySet()) {
rules.addAll(entry.getValue());
}
return rules;
}
这个问题在这里已经有了答案: Why filter() after flatMap() is "not completely" lazy in Java streams? (8 个答案) 关闭 6
我正在创建一个应用程序来从 Instagram 收集数据。我正在寻找像 Twitter 流 API 这样的流 API,这样我就可以自动实时收集数据而无需发送请求。 Instagram 有类似的 API
我正在使用 Apache Commons 在 Google App Engine 中上传一个 .docx 文件,如此链接中所述 File upload servlet .上传时,我还想使用 Apach
我尝试使用 DynamoDB 流和 AWS 提供的 Java DynamoDB 流 Kinesis 适配器捕获 DynamoDB 表更改。我正在 Scala 应用程序中使用 AWS Java 开发工具
我目前有一个采用 H.264 编码的 IP 摄像机流式视频 (RTSP)。 我想使用 FFmpeg 将此 H.264 编码流转换为另一个 RTSP 流,但 MPEG-2 编码。我该怎么做?我应该使用哪
Redis 流是否受益于集群模式?假设您有 10 个流,它们是分布在整个集群中还是都分布在同一节点上?我计划使用 Redis 流来实现真正的高吞吐量(200 万条消息/秒),所以我担心这种规模的 Re
这件事困扰了我一段时间。 所以我有一个 Product 类,它有一个 Image 列表(该列表可能为空)。 我想做 product.getImages().stream().filter(...) 但
是否可以使用 具有持久存储的 Redis 流 还是流仅限于内存数据? 我知道可以将 Redis 与核心数据结构的持久存储一起使用,但我已经能够理解是否也可以使用 Redis 中的流的持久存储。 最佳答
我开始学习 Elixir 并遇到了一个我无法轻松解决的挑战。 我正在尝试创建一个函数,该函数接受一个 Enumerable.t 并返回另一个 Enumerable.t ,其中包含下 n 个项目。它与
我试图从 readLine 调用创建一个无限的字符串流: import java.io.{BufferedReader, InputStreamReader} val in = new Buffere
你能帮我使用 Java 8 流 API 编写以下代码吗? SuperUser superUser = db.getSuperUser; for (final Client client : super
我正在尝试服用补品routeguide tutorial,并将客户端变成rocket服务器。我只是接受响应并将gRPC转换为字符串。 service RouteGuide { rpc GetF
流程代码可以是run here. 使用 flow,我有一个函数,它接受一个键值对对象并获取它的值 - 它获取的值应该是字符串、数字或 bool 值。 type ValueType = string
如果我有一个函数返回一个包含数据库信息的对象或一个空对象,如下所示: getThingFromDB: async function(id:string):Promise{ const from
我正在尝试使用javascript api和FB.ui将ogg音频文件发布到流中, 但是我不知道该怎么做。 这是我给FB.ui的电话: FB.ui( { method: '
我正在尝试删除工作区(或克隆它以使其看起来像父工作区,但我似乎两者都做不到)。但是,当我尝试时,我收到此消息:无法删除工作区 test_workspace,因为它有一个非空的默认组。 据我所知,这意味
可以使用 Stream|Map 来完成此操作,这样我就不需要将结果放入外部 HashMap 中,而是使用 .collect(Collectors.toMap(...)); 收集结果? Map rep
当我们从集合列表中获取 Stream 时,幕后到底发生了什么?我发现很多博客都说Stream不存储任何数据。如果这是真的,请考虑代码片段: List list = new ArrayList(); l
我对流及其工作方式不熟悉,我正在尝试获取列表中添加的特定对象的出现次数。 我找到了一种使用Collections来做到这一点的方法。其过程如下: for (int i = 0; i p.conten
我希望将一个 map 列表转换为另一个分组的 map 列表。 所以我有以下 map 列表 - List [{ "accId":"1", "accName":"TestAcc1", "accNumber
我是一名优秀的程序员,十分优秀!