gpt4 book ai didi

java - 使用@KafkaListener 对 Kafka 消费者进行勇敢追踪

转载 作者:行者123 更新时间:2023-12-04 08:33:57 26 4
gpt4 key购买 nike

我正在使用 Brave 库 https://github.com/openzipkin/brave用于跟踪,现在我想将它也用于 Kafka 消费者。我想避免添加 Spring Sleuth 并仅利用 Brave Kafka 检测 https://github.com/openzipkin/brave/tree/master/instrumentation/kafka-clients .
对于 Kafka 消费者,我使用 @KafkaListener .代码如下所示:
TestKafkaEndpoint.java

@Service
public class TestKafkaEndpoint {

@KafkaListener(topics = "myTestTopic", containerFactory = "testKafkaListenerContainerFactory")
public void procesMyRequest(@Payload final MyRequest request) {
// do some magic...
}
}
和配置类 TestKafkaConfig.java

@Configuration
@EnableKafka
@ComponentScan
public class TestKafkaConfig {

@Bean
public ConsumerFactory<String, MyRequest> testConsumerFactory() {
final Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01-localhost:9092");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "TestGROUP");
return new DefaultKafkaConsumerFactory<>(consumerProperties, new StringDeserializer(), new JsonDeserializer<>(MyRequest.class));
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MyRequest>> testKafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, MyRequest> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(testConsumerFactory());
factory.getContainerProperties().setErrorHandler(new LoggingErrorHandler());
return factory;
}

但是不知道怎么用 卡夫卡消费者 使用 Kafka 工厂或利用 时KafkaTracing .有没有人有这方面的经验并让它工作?

最佳答案

我不熟悉它,但它看起来像TracingConsumer是一个简单的消费者包装器:https://github.com/openzipkin/brave/blob/363ceb4c922305ffb4a68ac47dc152e1d15da0fb/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java#L69-L79
您应该能够创建 DefaultKafkaConsumerFactory 的子类;覆盖 createConsumer方法 - 监听器容器使用...

this.consumer =
KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
this.consumerGroupId,
this.containerProperties.getClientId(),
KafkaMessageListenerContainer.this.clientIdSuffix,
consumerProperties);
... 调用 super.createConsumer(...) 并将其包装在 TracingConsumer 中.
如果您使用的是 2.5.3 或更高版本,则可以添加 ConsumerPostProcessor到 DKCF。
这就是侦探的工作方式:
https://github.com/spring-cloud/spring-cloud-sleuth/blob/6e306e594d20361483fd19739e0f5f8e82354bf5/spring-cloud-sleuth-brave/src/main/java/org/springframework/cloud/sleuth/brave/instrument/messaging/TraceMessagingAutoConfiguration.java#L263-L285

关于java - 使用@KafkaListener 对 Kafka 消费者进行勇敢追踪,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64896332/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com