gpt4 book ai didi

java - 使用 Kafka Streams 进行 OpenTracing - 如何?

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:07:20 24 4
gpt4 key购买 nike

我正在尝试将 Jaeger 跟踪集成到 K-Streams 中。我正计划将跟踪添加到我最重要的几个管道中,并且想知道将 traceid 从一个管道传递到另一个管道的好方法是什么?

这是我目前所拥有的 - 在流处理管道开始时,我启动了一个服务器跨度并将 traceid 保存到状态存储中。稍后,在转换管道中,我访问 statestore 并从 transform() 方法捕获跟踪。这是在流处理中处理跟踪的好方法吗?

input
.mapValues(toSomethingThatMyAppUnderstands)
.mapValues(this::startStreamTrace)
.filter((k, v) -> v.isPresent())
.mapValues(Optional::get)
.mapValues(doSomethingHereWith)
.flatMapValues(doSomethingElse)
.filter((k, v) -> isInterestingEvent(v))
.transform(() -> new TransformerWithTracing<SomeObjectA, SomeObjectB>(IN_MEM_STORE_NAME, someFunction), IN_MEM_STORE_NAME)
.flatMapValues(c -> c)
.to(outTopic, Produced.with(Serdes.String(), new EventSerde()));



public class TransformerWithTracing<V, VR> implements Transformer<String, V, KeyValue<String, VR>> {

final Function valueAction;
final String storeId;
private KeyValueStore<String, String> traceIdStore;

public TransformerWithTracing(String storeId, Function valueAction) {
this.storeId = storeId;
this.valueAction = valueAction;
}

@Override
public void init(ProcessorContext context) {
// KeyValueStore store = ((KeyValueStore<String, String>) context.getStateStore(storeId));
InMemoryKeyValueStore inMemoryKeyValueStore = (InMemoryKeyValueStore) store;
this.traceIdStore = store;
}

@Override
public KeyValue<String, VR> transform(String key, V value) {
System.out.println(traceIdStore.get(key));

// BuildTraceHeader
try(Scope scope = serviceTracer.startServerSpan(traceHeader, "Converting to Enterprise Event")) {
return KeyValue.pair(key, (VR) valueAction.apply(value));
}
}

@Override
public KeyValue<String, VR> punctuate(long timestamp) {
return null;
}

@Override
public void close() {
// if (streamId != null) traceIdStore.delete(streamId);
}

}

最佳答案

@jeqo 在这个 zipkin/brave repo 中有类似的想法。

https://github.com/jeqo/brave/tree/kafka-streams-processor/instrumentation/kafka-streams

opentracing-contrib repo 中似乎也有一些可用的东西,但它似乎只在跟踪生产者/消费者级别。

https://github.com/opentracing-contrib/java-kafka-client/tree/master/opentracing-kafka-streams

  • 莱尼

关于java - 使用 Kafka Streams 进行 OpenTracing - 如何?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52875462/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com