- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
前面花了大量篇幅来介绍Brave的使用,一直把Zipkin当黑盒在使用,现在来逐渐拨开Zipkin的神秘面纱。
Zipkin的源代码地址为:https://github.com/openzipkin/zipkin
Zipkin的源码结构
zipkin - 对应的是zipkin v1
zipkin2 - 对应的是zipkin v2
zipkin-server - 是zipkin的web工程目录,zipkin.server.ZipkinServer是启动类
zipkin-ui - zipkin ui工程目录,zipkin的设计师前后端分离的,zipkin-server提供数据查询接口,zipkin-ui做数据展现。
zipkin-autoconfigure - 是为springboot提供的自动配置相关的类
collector-kafka
collector-kafka10
collector-rabbitmq
collector-scribe
metrics-prometheus
storage-cassandra
storage-cassandra3
storage-elasticsearch-aws
storage-elasticsearch-http
storage-mysql
ui
zipkin-collector - 是zipkin比较重要的模块,收集trace信息,支持从kafka和rabbitmq,以及scribe中收集,这个模块是可选的,因为zipkin默认使用http协议提供给客户端来收集
zipkin-storage - 也是zipkin比较重要的模块,用于存储收集的trace信息,默认是使用内置的InMemoryStorage,即存储在内存中,重启就会丢失。我们可以根据我们实际的需要更换存储方式,将trace存储在mysql,elasticsearch,cassandra中。
ZipkinServer是SpringBoot启动类,该类上使用了@EnableZipkinServer注解,加载了相关的Bean,而且在启动方法中添加了监听器RegisterZipkinHealthIndicators类,来初始化健康检查的相关bean。
@SpringBootApplication
@EnableZipkinServer
public class ZipkinServer {
public static void main(String[] args) {
new SpringApplicationBuilder(ZipkinServer.class)
.listeners(new RegisterZipkinHealthIndicators())
.properties("spring.config.name=zipkin-server").run(args);
}
}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({
ZipkinServerConfiguration.class,
BraveConfiguration.class,
ZipkinQueryApiV1.class,
ZipkinHttpCollector.class
})
public @interface EnableZipkinServer {
}
EnableZipkinServer注解导入了ZipkinServerConfiguration,BraveConfiguration,ZipkinQueryApiV1,ZipkinHttpCollector。注意,这里并没有导入ZipkinQueryApiV2,但是由于SpringBoot项目会默认加载和启动类在一个包,或者在其子包的所有使用Component,Controller,Service等注解的类,所以在启动后,也会发现ZipkinQueryApiV2也被加载了。
所有Zipkin服务需要的Bean都在这个类里进行配置
Zipkin健康自检的类,实现了springboot-actuate的CompositeHealthIndicator,提供系统组件的健康信息
final class ZipkinHealthIndicator extends CompositeHealthIndicator {
ZipkinHealthIndicator(HealthAggregator healthAggregator) {
super(healthAggregator);
}
void addComponent(Component component) {
String healthName = component instanceof V2StorageComponent
? ((V2StorageComponent) component).delegate().getClass().getSimpleName()
: component.getClass().getSimpleName();
healthName = healthName.replace("AutoValue_", "");
addHealthIndicator(healthName, new ComponentHealthIndicator(component));
}
static final class ComponentHealthIndicator implements HealthIndicator {
final Component component;
ComponentHealthIndicator(Component component) {
this.component = component;
}
@Override public Health health() {
Component.CheckResult result = component.check();
return result.ok ? Health.up().build() : Health.down(result.exception).build();
}
}
}
启动时加载的RegisterZipkinHealthIndicators类,当启动启动后,收到ApplicationReadyEvent事件,即系统已经启动完毕,会将Spring容器中的zipkin.Component添加到ZipkinHealthIndicator中
public final class RegisterZipkinHealthIndicators implements ApplicationListener {
@Override public void onApplicationEvent(ApplicationEvent event) {
if (!(event instanceof ApplicationReadyEvent)) return;
ConfigurableListableBeanFactory beanFactory =
((ApplicationReadyEvent) event).getApplicationContext().getBeanFactory();
ZipkinHealthIndicator healthIndicator = beanFactory.getBean(ZipkinHealthIndicator.class);
for (Component component : beanFactory.getBeansOfType(Component.class).values()) {
healthIndicator.addComponent(component);
}
}
}
启动zipkin,访问下面地址,可以看到输出zipkin的健康检查信息
http://localhost:9411/health.json
{"status":"UP","zipkin":{"status":"UP","InMemoryStorage":{"status":"UP"}},"diskSpace":{"status":"UP","total":429495595008,"free":392936411136,"threshold":10485760}}
Zipkin默认的Collector使用http协议里收集Trace信息,客户端均调用/api/v1/spans或/api/v2/spans来上报trace信息
@Autowired ZipkinHttpCollector(StorageComponent storage, CollectorSampler sampler,
CollectorMetrics metrics) {
this.metrics = metrics.forTransport("http");
this.collector = Collector.builder(getClass())
.storage(storage).sampler(sampler).metrics(this.metrics).build();
}
@RequestMapping(value = "/api/v2/spans", method = POST)
public ListenableFuture<ResponseEntity<?>> uploadSpansJson2(
@RequestHeader(value = "Content-Encoding", required = false) String encoding,
@RequestBody byte[] body
) {
return validateAndStoreSpans(encoding, JSON2_DECODER, body);
}
ListenableFuture<ResponseEntity<?>> validateAndStoreSpans(String encoding, SpanDecoder decoder,
byte[] body) {
SettableListenableFuture<ResponseEntity<?>> result = new SettableListenableFuture<>();
metrics.incrementMessages();
if (encoding != null && encoding.contains("gzip")) {
try {
body = gunzip(body);
} catch (IOException e) {
metrics.incrementMessagesDropped();
result.set(ResponseEntity.badRequest().body("Cannot gunzip spans: " + e.getMessage() + "\n"));
}
}
collector.acceptSpans(body, decoder, new Callback<Void>() {
@Override public void onSuccess(@Nullable Void value) {
result.set(SUCCESS);
}
@Override public void onError(Throwable t) {
String message = t.getMessage() == null ? t.getClass().getSimpleName() : t.getMessage();
result.set(t.getMessage() == null || message.startsWith("Cannot store")
? ResponseEntity.status(500).body(message + "\n")
: ResponseEntity.status(400).body(message + "\n"));
}
});
return result;
}
ZipkinHttpCollector中uploadSpansJson2方法接受所有/api/v2/spans请求,然后调用validateAndStoreSpans方法校验并存储Span
在validateAndStoreSpans方法中,当请求数据为gzip格式,会先解压缩,然后调用collector的acceptSpans方法
zipkin.collector.Collector的acceptSpans方法中,对各种格式的Span数据做了兼容处理,我们这里只看下V2版的JSON格式的Span是如何处理的,即会调用storage2(V2Collector)的acceptSpans方法
public class Collector
extends zipkin.internal.Collector<SpanDecoder, zipkin.Span> {
@Override
public void acceptSpans(byte[] serializedSpans, SpanDecoder decoder, Callback<Void> callback) {
try {
if (decoder instanceof DetectingSpanDecoder) decoder = detectFormat(serializedSpans);
} catch (RuntimeException e) {
metrics.incrementBytes(serializedSpans.length);
callback.onError(errorReading(e));
return;
}
if (storage2 != null && decoder instanceof V2JsonSpanDecoder) {
storage2.acceptSpans(serializedSpans, SpanBytesDecoder.JSON_V2, callback);
} else {
super.acceptSpans(serializedSpans, decoder, callback);
}
}
}
zipkin.internal.V2Collector继承了zipkin.internal.Collector,而在Collector的acceptSpans方法中会调用decodeList先将传入的二进制数据转换成Span对象,然后调用accept方法,accept方法中会调用sampled方法,将需要采样的Span过滤出来,最后调用record方法将Span信息存入Storage中。
public abstract class Collector<D, S> {
protected void acceptSpans(byte[] serializedSpans, D decoder, Callback<Void> callback) {
metrics.incrementBytes(serializedSpans.length);
List<S> spans;
try {
spans = decodeList(decoder, serializedSpans);
} catch (RuntimeException e) {
callback.onError(errorReading(e));
return;
}
accept(spans, callback);
}
public void accept(List<S> spans, Callback<Void> callback) {
if (spans.isEmpty()) {
callback.onSuccess(null);
return;
}
metrics.incrementSpans(spans.size());
List<S> sampled = sample(spans);
if (sampled.isEmpty()) {
callback.onSuccess(null);
return;
}
try {
record(sampled, acceptSpansCallback(sampled));
callback.onSuccess(null);
} catch (RuntimeException e) {
callback.onError(errorStoringSpans(sampled, e));
return;
}
}
List<S> sample(List<S> input) {
List<S> sampled = new ArrayList<>(input.size());
for (S s : input) {
if (isSampled(s)) sampled.add(s);
}
int dropped = input.size() - sampled.size();
if (dropped > 0) metrics.incrementSpansDropped(dropped);
return sampled;
}
}
V2Collector中的record方法会调用storage的accept方法,zipkin默认会使用InMemoryStorage来存储
public final class V2Collector extends Collector<BytesDecoder<Span>, Span> {
@Override protected List<Span> decodeList(BytesDecoder<Span> decoder, byte[] serialized) {
List<Span> out = new ArrayList<>();
if (!decoder.decodeList(serialized, out)) return Collections.emptyList();
return out;
}
@Override protected boolean isSampled(Span span) {
return sampler.isSampled(Util.lowerHexToUnsignedLong(span.traceId()), span.debug());
}
@Override protected void record(List<Span> sampled, Callback<Void> callback) {
storage.spanConsumer().accept(sampled).enqueue(new V2CallbackAdapter<>(callback));
}
}
暴露了Zipkin对外的查询API,V1和V2的区别,主要是Span里的字段叫法不一样了,这里主要看下ZipkinQueryApiV2,ZipkinQueryApiV2方法都比较简单,主要是调用storage组件来实现查询功能。
dependencies - 查看所有trace的依赖关系
services - 查看所有的services
spans - 根据serviceName查询spans信息
traces - 根据serviceName,spanName,annotationQuery,minDuration,maxDuration等来搜索traces信息
trace/{traceIdHex} - 根据traceId查询某条trace信息
至此ZipkinServer的代码分析的差不多了,在后面博文中我们再具体分析各种Storage,和Collector的源代码。
我目前正在尝试基于哈希表构建字典。逻辑是:有一个名为 HashTable 的结构,其中包含以下内容: HashFunc HashFunc; PrintFunc PrintEntry; CompareF
如果我有一个指向结构/对象的指针,并且该结构/对象包含另外两个指向其他对象的指针,并且我想删除“包含这两个指针的对象而不破坏它所持有的指针”——我该怎么做这样做吗? 指向对象 A 的指针(包含指向对象
像这样的代码 package main import "fmt" type Hello struct { ID int Raw string } type World []*Hell
我有一个采用以下格式的 CSV: Module, Topic, Sub-topic 它需要能够导入到具有以下格式的 MySQL 数据库中: CREATE TABLE `modules` ( `id
通常我使用类似的东西 copy((uint8_t*)&POD, (uint8_t*)(&POD + 1 ), back_inserter(rawData)); copy((uint8_t*)&PODV
错误 : 联合只能在具有兼容列类型的表上执行。 结构(层:字符串,skyward_number:字符串,skyward_points:字符串)<> 结构(skyward_number:字符串,层:字符
我有一个指向结构的指针数组,我正在尝试使用它们进行 while 循环。我对如何准确初始化它并不完全有信心,但我一直这样做: Entry *newEntry = malloc(sizeof(Entry)
我正在学习 C,我的问题可能很愚蠢,但我很困惑。在这样的函数中: int afunction(somevariables) { if (someconditions)
我现在正在做一项编程作业,我并没有真正完全掌握链接,因为我们还没有涉及它。但是我觉得我需要它来做我想做的事情,因为数组还不够 我创建了一个结构,如下 struct node { float coef;
给定以下代码片段: #include #include #define MAX_SIZE 15 typedef struct{ int touchdowns; int intercepti
struct contact list[3]; int checknullarray() { for(int x=0;x<10;x++) { if(strlen(con
这个问题在这里已经有了答案: 关闭 11 年前。 Possible Duplicate: Empty “for” loop in Facebook ajax what does AJAX call
我刚刚在反射器中浏览了一个文件,并在结构构造函数中看到了这个: this = new Binder.SyntaxNodeOrToken(); 我以前从未见过该术语。有人能解释一下这个赋值在 C# 中的
我经常使用字符串常量,例如: DICT_KEY1 = 'DICT_KEY1' DICT_KEY2 = 'DICT_KEY2' ... 很多时候我不介意实际的文字是什么,只要它们是独一无二的并且对人类读
我是 C 的新手,我不明白为什么下面的代码不起作用: typedef struct{ uint8_t a; uint8_t* b; } test_struct; test_struct
您能否制作一个行为类似于内置类之一的结构,您可以在其中直接分配值而无需调用属性? 前任: RoundedDouble count; count = 5; 而不是使用 RoundedDouble cou
这是我的代码: #include typedef struct { const char *description; float value; int age; } swag
在创建嵌套列表时,我认为 R 具有对列表元素有用的命名结构。我有一个列表列表,并希望应用包含在任何列表中的每个向量的函数。 lapply这样做但随后剥离了列表的命名结构。我该怎么办 lapply嵌套列
我正在做一个用于学习目的的个人组织者,我从来没有使用过 XML,所以我不确定我的解决方案是否是最好的。这是我附带的 XML 文件的基本结构:
我是新来的 nosql概念,所以当我开始学习时 PouchDB ,我找到了这个转换表。我的困惑是,如何PouchDB如果可以说我有多个表,是否意味着我需要创建多个数据库?因为根据我在 pouchdb
我是一名优秀的程序员,十分优秀!