zipkin-server源码分析(二)

源码地址

zipkin-server配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
zipkin:
self-tracing:
# Set to true to enable self-tracing.
enabled: ${SELF_TRACING_ENABLED:false}
# percentage to self-traces to retain
sample-rate: ${SELF_TRACING_SAMPLE_RATE:1.0}
# Interval in seconds to flush self-tracing data to storage.
flush-interval: ${SELF_TRACING_FLUSH_INTERVAL:1}
collector:
# percentage to traces to retain
sample-rate: ${COLLECTOR_SAMPLE_RATE:1.0}
kafka:
# ZooKeeper host string, comma-separated host:port value.
zookeeper: ${KAFKA_ZOOKEEPER:}
# Name of topic to poll for spans
topic: ${KAFKA_TOPIC:zipkin}
# Consumer group this process is consuming on behalf of.
group-id: ${KAFKA_GROUP_ID:zipkin}
# Count of consumer threads consuming the topic
streams: ${KAFKA_STREAMS:1}
# Maximum size of a message containing spans in bytes
max-message-size: ${KAFKA_MAX_MESSAGE_SIZE:1048576}
scribe:
enabled: ${SCRIBE_ENABLED:false}
category: zipkin
port: ${COLLECTOR_PORT:9410}
query:
# 7 days in millis
lookback: ${QUERY_LOOKBACK:86400000}
# The Cache-Control max-age (seconds) for /api/v1/services and /api/v1/spans
names-max-age: 300
# CORS allowed-origins.
allowed-origins: "*"
storage:
strict-trace-id: ${STRICT_TRACE_ID:true}
type: ${STORAGE_TYPE:mem}
cassandra:
# Comma separated list of hosts / ip addresses part of Cassandra cluster.
contact-points: ${CASSANDRA_CONTACT_POINTS:localhost}
# Name of the datacenter that will be considered "local" for latency load balancing. When unset, load-balancing is round-robin.
local-dc: ${CASSANDRA_LOCAL_DC:}
# Will throw an exception on startup if authentication fails.
username: ${CASSANDRA_USERNAME:}
password: ${CASSANDRA_PASSWORD:}
keyspace: ${CASSANDRA_KEYSPACE:zipkin}
# Max pooled connections per datacenter-local host.
max-connections: ${CASSANDRA_MAX_CONNECTIONS:8}
# Ensuring that schema exists, if enabled tries to execute script /zipkin-cassandra-core/resources/cassandra-schema-cql3.txt.
ensure-schema: ${CASSANDRA_ENSURE_SCHEMA:true}
# 7 days in seconds
span-ttl: ${CASSANDRA_SPAN_TTL:604800}
# 3 days in seconds
index-ttl: ${CASSANDRA_INDEX_TTL:259200}
# the maximum trace index metadata entries to cache
index-cache-max: ${CASSANDRA_INDEX_CACHE_MAX:100000}
# how long to cache index metadata about a trace. 1 minute in seconds
index-cache-ttl: ${CASSANDRA_INDEX_CACHE_TTL:60}
# how many more index rows to fetch than the user-supplied query limit
index-fetch-multiplier: ${CASSANDRA_INDEX_FETCH_MULTIPLIER:3}
# Using ssl for connection, rely on Keystore
use-ssl: ${CASSANDRA_USE_SSL:false}
cassandra3:
# Comma separated list of hosts / ip addresses part of Cassandra cluster.
contact-points: ${CASSANDRA3_CONTACT_POINTS:localhost}
# Name of the datacenter that will be considered "local" for latency load balancing. When unset, load-balancing is round-robin.
local-dc: ${CASSANDRA3_LOCAL_DC:}
# Will throw an exception on startup if authentication fails.
username: ${CASSANDRA3_USERNAME:}
password: ${CASSANDRA3_PASSWORD:}
keyspace: ${CASSANDRA3_KEYSPACE:zipkin3}
# Max pooled connections per datacenter-local host.
max-connections: ${CASSANDRA3_MAX_CONNECTIONS:8}
# Ensuring that schema exists, if enabled tries to execute script /cassandra3-schema.cql
ensure-schema: ${CASSANDRA3_ENSURE_SCHEMA:true}
# how many more index rows to fetch than the user-supplied query limit
index-fetch-multiplier: ${CASSANDRA3_INDEX_FETCH_MULTIPLIER:3}
# Using ssl for connection, rely on Keystore
use-ssl: ${CASSANDRA3_USE_SSL:false}
elasticsearch:
# host is left unset intentionally, to defer the decision
hosts: ${ES_HOSTS:}
pipeline: ${ES_PIPELINE:}
max-requests: ${ES_MAX_REQUESTS:64}
aws:
domain: ${ES_AWS_DOMAIN:}
region: ${ES_AWS_REGION:}
index: ${ES_INDEX:zipkin}
date-separator: ${ES_DATE_SEPARATOR:-}
index-shards: ${ES_INDEX_SHARDS:5}
index-replicas: ${ES_INDEX_REPLICAS:1}
username: ${ES_USERNAME:}
password: ${ES_PASSWORD:}
mysql:
host: ${MYSQL_HOST:localhost}
port: ${MYSQL_TCP_PORT:3306}
username: ${MYSQL_USER:}
password: ${MYSQL_PASS:}
db: ${MYSQL_DB:zipkin}
max-active: ${MYSQL_MAX_CONNECTIONS:10}
use-ssl: ${MYSQL_USE_SSL:false}
ui:
## Values below here are mapped to ZipkinUiProperties, served as /config.json
# Default limit for Find Traces
query-limit: 10
# The value here becomes a label in the top-right corner
environment:
# Default duration to look back when finding traces.
# Affects the "Start time" element in the UI. 1 hour in millis
default-lookback: 3600000
# Which sites this Zipkin UI covers. Regex syntax. (e.g. http:\/\/example.com\/.*)
# Multiple sites can be specified, e.g.
# - .*example1.com
# - .*example2.com
# Default is "match all websites"
instrumented: .*
server:
port: ${QUERY_PORT:9411}
compression:
enabled: true
# compresses any response over min-response-size (default is 2KiB)
# Includes dynamic json content and large static assets from zipkin-ui
mime-types: application/json,application/javascript,text/css,image/svg
spring:
mvc:
favicon:
# zipkin has its own favicon
enabled: false
autoconfigure:
exclude:
# otherwise we might initialize even when not needed (ex when storage type is cassandra)
- org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
info:
zipkin:
version: "@project.version@"
logging:
level:
# Silence Invalid method name: '__can__finagle__trace__v3__'
com.facebook.swift.service.ThriftServiceProcessor: 'OFF'
# # investigate /api/v1/dependencies
# zipkin.internal.DependencyLinker: 'DEBUG'
# # log cassandra queries (DEBUG is without values)
# com.datastax.driver.core.QueryLogger: 'TRACE'
# # log cassandra trace propagation
# com.datastax.driver.core.Message: 'TRACE'

zipkin-server启动

zipkin基于springboot

zipkin-server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@SpringBootApplication
@EnableZipkinServer
public class ZipkinServer {
public static void main(String[] args) {
new SpringApplicationBuilder(ZipkinServer.class)
.listeners(new RegisterZipkinHealthIndicators())
.properties("spring.config.name=zipkin-server").run(args);
}
}
导入
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({ZipkinServerConfiguration.class, BraveConfiguration.class, ZipkinQueryApiV1.class, ZipkinHttpCollector.class})
public @interface EnableZipkinServer {
}

step1.构建存储

  • StorageComponent
    image
  • SpanStore
    image
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    storage:
    strict-trace-id: ${STRICT_TRACE_ID:true}
    type: ${STORAGE_TYPE:mem}
    配置文件默认使用为mem内存存储
    可以修改 -XX为springboot对应配置
    -XXstorage.type=对应存储结构
    @Configuration
    public class ZipkinServerConfiguration {
    ...
    //对应默认存储配置,只有当zipkin.storage.type=mem才会执行
    @Configuration
    // "matchIfMissing = true" ensures this is used when there's no configured storage type
    @ConditionalOnProperty(name = "zipkin.storage.type", havingValue = "mem", matchIfMissing = true)
    @ConditionalOnMissingBean(StorageComponent.class)
    static class InMemoryConfiguration {
    @Bean StorageComponent storage(@Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId) {
    return InMemoryStorage.builder().strictTraceId(strictTraceId).build();
    }
    }
    }

提供api

生成trace

rest入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@RestController
@CrossOrigin("${zipkin.query.allowed-origins:*}")
public class ZipkinHttpCollector {
static final ResponseEntity<?> SUCCESS = ResponseEntity.accepted().build();
static final String APPLICATION_THRIFT = "application/x-thrift";
final CollectorMetrics metrics;
final Collector collector;
@Autowired ZipkinHttpCollector(StorageComponent storage, CollectorSampler sampler,
CollectorMetrics metrics) {
this.metrics = metrics.forTransport("http");
this.collector = Collector.builder(getClass())
.storage(storage).sampler(sampler).metrics(this.metrics).build();
}
//入口
@RequestMapping(value = "/api/v1/spans", method = POST)
public ListenableFuture<ResponseEntity<?>> uploadSpansJson(
@RequestHeader(value = "Content-Encoding", required = false) String encoding,
@RequestBody byte[] body
) {
return validateAndStoreSpans(encoding, Codec.JSON, body);
}
@RequestMapping(value = "/api/v1/spans", method = POST, consumes = APPLICATION_THRIFT)
public ListenableFuture<ResponseEntity<?>> uploadSpansThrift(
@RequestHeader(value = "Content-Encoding", required = false) String encoding,
@RequestBody byte[] body
) {
return validateAndStoreSpans(encoding, Codec.THRIFT, body);
}
ListenableFuture<ResponseEntity<?>> validateAndStoreSpans(String encoding, Codec codec,
byte[] body) {
SettableListenableFuture<ResponseEntity<?>> result = new SettableListenableFuture<>();
metrics.incrementMessages();
if (encoding != null && encoding.contains("gzip")) {
try {
body = gunzip(body);
} catch (IOException e) {
metrics.incrementMessagesDropped();
result.set(ResponseEntity.badRequest().body("Cannot gunzip spans: " + e.getMessage() + "\n"));
}
}
//接收span
collector.acceptSpans(body, codec, new Callback<Void>() {
@Override public void onSuccess(@Nullable Void value) {
result.set(SUCCESS);
}
@Override public void onError(Throwable t) {
String message = t.getMessage() == null ? t.getClass().getSimpleName() : t.getMessage();
result.set(t.getMessage() == null || message.startsWith("Cannot store")
? ResponseEntity.status(500).body(message + "\n")
: ResponseEntity.status(400).body(message + "\n"));
}
});
return result;
}
//略
}

collector处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public final class Collector {
/** Needed to scope this to the correct logging category */
public static Builder builder(Class<?> loggingClass) {
return new Builder(Logger.getLogger(checkNotNull(loggingClass, "loggingClass").getName()));
}
public static final class Builder {
final Logger logger;
StorageComponent storage = null;
CollectorSampler sampler = CollectorSampler.ALWAYS_SAMPLE;
CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS;
...
public Collector build() {
return new Collector(this);
}
}
final Logger logger;
final StorageComponent storage;
final CollectorSampler sampler;
final CollectorMetrics metrics;
Collector(Builder builder) {
this.logger = checkNotNull(builder.logger, "logger");
this.storage = checkNotNull(builder.storage, "storage");
this.sampler = builder.sampler == null ? CollectorSampler.ALWAYS_SAMPLE : builder.sampler;
this.metrics = builder.metrics == null ? CollectorMetrics.NOOP_METRICS : builder.metrics;
}
public void acceptSpans(byte[] serializedSpans, Codec codec, Callback<Void> callback) {
metrics.incrementBytes(serializedSpans.length);//记录指标
List<Span> spans;
try {
spans = codec.readSpans(serializedSpans);//字节数组转换成对象
} catch (RuntimeException e) {
callback.onError(errorReading(e));
return;
}
accept(spans, callback);//处理span
}
...
public void accept(List<Span> spans, Callback<Void> callback) {
if (spans.isEmpty()) {
callback.onSuccess(null);
return;
}
metrics.incrementSpans(spans.size());
List<Span> sampled = sample(spans);
if (sampled.isEmpty()) {
callback.onSuccess(null);
return;
}
try {
storage.asyncSpanConsumer().accept(sampled, acceptSpansCallback(sampled));//处理
callback.onSuccess(null);
} catch (RuntimeException e) {
callback.onError(errorStoringSpans(sampled, e));
return;
}
}
//取样
List<Span> sample(List<Span> input) {
List<Span> sampled = new ArrayList<>(input.size());
for (Span s : input) {
if (sampler.isSampled(s)) sampled.add(s);
}
int dropped = input.size() - sampled.size();
if (dropped > 0) metrics.incrementSpansDropped(dropped);
return sampled;
}
...
}

InMemorySpanStore最终处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/** Internally, spans are indexed on 64-bit trace ID */
public final class InMemorySpanStore implements SpanStore {
private final Multimap<Long, Span> traceIdToSpans = new LinkedListMultimap<>();//traceId+span
private final Set<Pair<Long>> traceIdTimeStamps = new TreeSet<>(VALUE_2_DESCENDING);//traceId+timestap
private final Multimap<String, Pair<Long>> serviceToTraceIdTimeStamp =
new SortedByValue2Descending<>();
private final Multimap<String, String> serviceToSpanNames =
new LinkedHashSetMultimap<>();//serviceName+spanName
private final boolean strictTraceId;
volatile int acceptedSpanCount;
// Historical constructor
public InMemorySpanStore() {
this(new InMemoryStorage.Builder());
}
InMemorySpanStore(InMemoryStorage.Builder builder) {
this.strictTraceId = builder.strictTraceId;
}
final StorageAdapters.SpanConsumer spanConsumer = new StorageAdapters.SpanConsumer() {
@Override public void accept(List<Span> spans) {
for (Span span : spans) {
Long timestamp = guessTimestamp(span);
Pair<Long> traceIdTimeStamp =
Pair.create(span.traceId, timestamp == null ? Long.MIN_VALUE : timestamp);
String spanName = span.name;
synchronized (InMemorySpanStore.this) {
traceIdTimeStamps.add(traceIdTimeStamp);
traceIdToSpans.put(span.traceId, span);
acceptedSpanCount++;
for (String serviceName : span.serviceNames()) {
serviceToTraceIdTimeStamp.put(serviceName, traceIdTimeStamp);
serviceToSpanNames.put(serviceName, spanName);
}
}
}
}
@Override public String toString() {
return "InMemorySpanConsumer";
}
};
...
}

查询trace

提供api对应查询为配置的storeage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
@RestController
@RequestMapping("/api/v1")
@CrossOrigin("${zipkin.query.allowed-origins:*}")
public class ZipkinQueryApiV1 {
@Autowired
@Value("${zipkin.query.lookback:86400000}")
int defaultLookback = 86400000; // 1 day in millis
/** The Cache-Control max-age (seconds) for /api/v1/services and /api/v1/spans */
@Value("${zipkin.query.names-max-age:300}")
int namesMaxAge = 300; // 5 minutes
volatile int serviceCount; // used as a threshold to start returning cache-control headers
private final StorageComponent storage;
@Autowired
public ZipkinQueryApiV1(StorageComponent storage) {
this.storage = storage; // don't cache spanStore here as it can cause the app to crash!
}
@RequestMapping(value = "/dependencies", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE)
public byte[] getDependencies(@RequestParam(value = "endTs", required = true) long endTs,
@RequestParam(value = "lookback", required = false) Long lookback) {
return Codec.JSON.writeDependencyLinks(storage.spanStore().getDependencies(endTs, lookback != null ? lookback : defaultLookback));
}
@RequestMapping(value = "/services", method = RequestMethod.GET)
public ResponseEntity<List<String>> getServiceNames() {
List<String> serviceNames = storage.spanStore().getServiceNames();
serviceCount = serviceNames.size();
return maybeCacheNames(serviceNames);
}
@RequestMapping(value = "/spans", method = RequestMethod.GET)
public ResponseEntity<List<String>> getSpanNames(
@RequestParam(value = "serviceName", required = true) String serviceName) {
return maybeCacheNames(storage.spanStore().getSpanNames(serviceName));
}
@RequestMapping(value = "/traces", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE)
public String getTraces(
@RequestParam(value = "serviceName", required = false) String serviceName,
@RequestParam(value = "spanName", defaultValue = "all") String spanName,
@RequestParam(value = "annotationQuery", required = false) String annotationQuery,
@RequestParam(value = "minDuration", required = false) Long minDuration,
@RequestParam(value = "maxDuration", required = false) Long maxDuration,
@RequestParam(value = "endTs", required = false) Long endTs,
@RequestParam(value = "lookback", required = false) Long lookback,
@RequestParam(value = "limit", required = false) Integer limit) {
QueryRequest queryRequest = QueryRequest.builder()
.serviceName(serviceName)
.spanName(spanName)
.parseAnnotationQuery(annotationQuery)
.minDuration(minDuration)
.maxDuration(maxDuration)
.endTs(endTs)
.lookback(lookback != null ? lookback : defaultLookback)
.limit(limit).build();
return new String(Codec.JSON.writeTraces(storage.spanStore().getTraces(queryRequest)), UTF_8);
}
@RequestMapping(value = "/trace/{traceIdHex}", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE)
public String getTrace(@PathVariable String traceIdHex, WebRequest request) {
long traceIdHigh = traceIdHex.length() == 32 ? lowerHexToUnsignedLong(traceIdHex, 0) : 0L;
long traceIdLow = lowerHexToUnsignedLong(traceIdHex);
String[] raw = request.getParameterValues("raw"); // RequestParam doesn't work for param w/o value
List<Span> trace = raw != null
? storage.spanStore().getRawTrace(traceIdHigh, traceIdLow)
: storage.spanStore().getTrace(traceIdHigh, traceIdLow);
if (trace == null) {
throw new TraceNotFoundException(traceIdHex, traceIdHigh, traceIdLow);
}
return new String(Codec.JSON.writeSpans(trace), UTF_8);
}
@ExceptionHandler(TraceNotFoundException.class)
@ResponseStatus(HttpStatus.NOT_FOUND)
public void notFound() {
}
static class TraceNotFoundException extends RuntimeException {
public TraceNotFoundException(String traceIdHex, Long traceIdHigh, long traceId) {
super(String.format("Cannot find trace for id=%s, parsed value=%s", traceIdHex,
traceIdHigh != null ? traceIdHigh + "," + traceId : traceId));
}
}
/**
* We cache names if there are more than 3 services. This helps people getting started: if we
* cache empty results, users have more questions. We assume caching becomes a concern when zipkin
* is in active use, and active use usually implies more than 3 services.
*/
ResponseEntity<List<String>> maybeCacheNames(List<String> names) {
ResponseEntity.BodyBuilder response = ResponseEntity.ok();
if (serviceCount > 3) {
response.cacheControl(CacheControl.maxAge(namesMaxAge, TimeUnit.SECONDS).mustRevalidate());
}
return response.body(names);
}
}

PS

  • 如果更改了存储的类型,默认会进行直接切换,比如storage.type=elasticsearch,基于springboot的autoconfigure原则,ZipkinElasticsearchHttpStorageAutoConfiguration会执行,同时条件成立会直接创建elasticsearchStoreage
  • 当前只查看了inMemory的流程如果有兴趣其他流程可以自己去看

贴下流程图

  • zipkin-server接收插入请求-inMemory
    zipkin-server -inmemory

  • zipkin-server接收查询请求-inMemory
    zipkin-server -inmemory

  • 项目源码
    image

参考