- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 Brave 库 https://github.com/openzipkin/brave用于跟踪,现在我想将它也用于 Kafka 消费者。我想避免添加 Spring Sleuth 并仅利用 Brave Kafka 检测 https://github.com/openzipkin/brave/tree/master/instrumentation/kafka-clients .
对于 Kafka 消费者,我使用 @KafkaListener .代码如下所示:
TestKafkaEndpoint.java
@Service
public class TestKafkaEndpoint {
@KafkaListener(topics = "myTestTopic", containerFactory = "testKafkaListenerContainerFactory")
public void procesMyRequest(@Payload final MyRequest request) {
// do some magic...
}
}
和配置类
TestKafkaConfig.java
@Configuration
@EnableKafka
@ComponentScan
public class TestKafkaConfig {
@Bean
public ConsumerFactory<String, MyRequest> testConsumerFactory() {
final Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01-localhost:9092");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "TestGROUP");
return new DefaultKafkaConsumerFactory<>(consumerProperties, new StringDeserializer(), new JsonDeserializer<>(MyRequest.class));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MyRequest>> testKafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, MyRequest> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(testConsumerFactory());
factory.getContainerProperties().setErrorHandler(new LoggingErrorHandler());
return factory;
}
但是不知道怎么用
卡夫卡消费者 使用 Kafka 工厂或利用
时KafkaTracing .有没有人有这方面的经验并让它工作?
最佳答案
我不熟悉它,但它看起来像TracingConsumer
是一个简单的消费者包装器:https://github.com/openzipkin/brave/blob/363ceb4c922305ffb4a68ac47dc152e1d15da0fb/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java#L69-L79
您应该能够创建 DefaultKafkaConsumerFactory
的子类;覆盖 createConsumer
方法 - 监听器容器使用...
this.consumer =
KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
this.consumerGroupId,
this.containerProperties.getClientId(),
KafkaMessageListenerContainer.this.clientIdSuffix,
consumerProperties);
... 调用 super.createConsumer(...) 并将其包装在
TracingConsumer
中.
ConsumerPostProcessor
到 DKCF。
关于java - 使用@KafkaListener 对 Kafka 消费者进行勇敢追踪,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64896332/
我需要为监听器工厂生成的每个监听器定义自定义过滤策略。 目前,我正在使用 RecordFilterStrategy要做到这一点: @Bean ConcurrentKafkaListenerContai
我在 Spring Boot 项目中有一个 Spring KafkaListener: @KafkaListener(topics = "topic-one", groupId = "groupone
我将 spring-kafka 用于 springboot 2.0.4.RELEASE。 并使用KafkaListener获取消息 现在我想为我的组重置偏移量 但我不知道如何为该组获取消费者
需要什么 我正在编写一个使用 Kafka 获取信息的应用程序(Spring + Kotlin)。如果我在声明 @KafkaListener 时设置了 autoStartup = "true",那么该应
试图弄清楚我是否可以使用 spring-kafka 和 spring-kafka-test 为 @KafkaListener 编写单元测试。 我的听众课。 public class MyKaf
我要创建并发@KafkaListener它可以处理多个主题,每个主题具有不同数量的分区。 我注意到 Spring-Kafka 只为大多数分区的主题为每个分区初始化一个使用者。 示例:我将并发设置为 8
我正在尝试关注 https://github.com/spring-projects/spring-kafka/issues/361将主题名称从 .yml 文件传递给@kafkalistener。
我正在尝试关注 https://github.com/spring-projects/spring-kafka/issues/361将主题名称从 .yml 文件传递给@kafkalistener。
我有一个简单的 Spring Boot 应用程序,它从 Kafka 读取并写入 Kafka。我写了一个 SpringBootTest使用 EmbeddedKafka测试这一切。 主要问题是:有时测试失
我正在使用具有以下配置的 spring-kafka: package com.danigu.fancypants.infrastructure; import com.fasterxml.jackso
我正在使用 spring boot + spring @KafkaListener。我期望的行为是:我的 kafka 监听器在 10 个线程中读取消息。因此,如果其中一个线程挂起,其他消息将继续读取和
我正在编写一个 Kafka 消费者。我需要将环境变量主题名称传递给 @KafkaListener(topics = ...)。这是我到目前为止所尝试过的: import org.springfram
我真的很难编写一个测试来检查当消息发送到指定主题时我的 Kafka Consumer 是否被正确调用。 我的消费者: @Service @Slf4j @AllArgsConstructor(onCon
使用 Spring 云流的 Kafka 绑定(bind)的消息生产者 @Component public static class PageViewEventSource implements App
我在我的 spring boot 应用程序中使用了 @KafkaListener 注释,而没有创建自定义 KafkaListenerContainerFactory bean。我目前正在我的 appl
我为主题数组创建了一个 bean,在运行时我向这个主题数组添加了一些主题,但消费者没有更新主题并且仍然从主题数组中的第一个主题消费。我希望消费者添加这些新主题并开始消费 @Autowired priv
我正在尝试使用来自 kafka 服务器的数据,我正在使用 @Kafkalistener注解。 问题是每次重新启动应用程序时我都会收到所有消息。 如何在我的应用程序中保存上次使用的偏移量并使用它来使用下
我编写了一个 JUnit 测试用例来测试 Spring Kafka 文档中“使用 Java 配置”类(class)中的代码。 ( https://docs.spring.io/spring-kafka
来自 Twitter 的问题: 只是想找到一个使用 spring-kafka 2.1.7 的简单示例,该示例与 KafkaListener 和 AckMode.MANUAL_IMMEDIATE 一起使
在我的项目中,我有很多 spring 管理的组件,它们做同样的事情。我想创建一个通用的 Util 类,它可以为我的所有组件执行所有通用操作。由于这个 Util 类需要访问环境变量和 beans,它的实
我是一名优秀的程序员,十分优秀!