- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
无法确定我的 kafka 监听器配置有什么问题。最初,我有一个名为“transactions”的非空 kafka 主题,其中有几条记录(我可以在 KafkaTool 中看到它)。这是application.yml:
spring:
###
# Kafka Settings
###
kafka:
consumer:
bootstrap-servers: localhost:9092
key-deserializer: com.panbet.externalbet.history.report.support.ReportsBetKeyJsonDeserializer
value-deserializer: com.panbet.externalbet.history.report.support.ReportsBetJsonDeserializer
group-id: external.history.group
这是 Java 配置文件:
@EnableKafka
@Configuration
public class KafkaConfig
{
private final KafkaProperties properties;
public KafkaConfig(KafkaProperties properties)
{
this.properties = properties;
}
@Bean
public ConsumerFactory<ReportsBetKeyDto, ReportsBetDto> kafkaConsumerFactory()
{
return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<ReportsBetKeyDto, ReportsBetDto> kafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<ReportsBetKeyDto, ReportsBetDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
return factory;
}
}
我的 kafka 监听器看起来像(前面提到的主题“交易”的监听器):
@Component
public class ReportsConsumer
{
@KafkaListener(topics = { "transactions" })
public void listen(ConsumerRecord<ReportsBetKeyDto, ReportsBetDto> record)
{
System.out.println(record);
}
}
我期望:当应用程序启动时,我将在 ReportsConsumer.listen 方法中捕获 debudder。但不幸的是,什么也没发生。 Listened 与 kafka 主题没有关联。有什么问题吗?谢谢。
最佳答案
默认情况下,新消费者从主题末尾开始消费。
设置spring.kafka.consumer.auto-offset-reset=earliest
我还建议 spring.kafka.consumer.enable-auto-commit=false
以便容器管理偏移量。
关于java - @KafkaListener不接收记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52834075/
我需要为监听器工厂生成的每个监听器定义自定义过滤策略。 目前,我正在使用 RecordFilterStrategy要做到这一点: @Bean ConcurrentKafkaListenerContai
我在 Spring Boot 项目中有一个 Spring KafkaListener: @KafkaListener(topics = "topic-one", groupId = "groupone
我将 spring-kafka 用于 springboot 2.0.4.RELEASE。 并使用KafkaListener获取消息 现在我想为我的组重置偏移量 但我不知道如何为该组获取消费者
需要什么 我正在编写一个使用 Kafka 获取信息的应用程序(Spring + Kotlin)。如果我在声明 @KafkaListener 时设置了 autoStartup = "true",那么该应
试图弄清楚我是否可以使用 spring-kafka 和 spring-kafka-test 为 @KafkaListener 编写单元测试。 我的听众课。 public class MyKaf
我要创建并发@KafkaListener它可以处理多个主题,每个主题具有不同数量的分区。 我注意到 Spring-Kafka 只为大多数分区的主题为每个分区初始化一个使用者。 示例:我将并发设置为 8
我正在尝试关注 https://github.com/spring-projects/spring-kafka/issues/361将主题名称从 .yml 文件传递给@kafkalistener。
我正在尝试关注 https://github.com/spring-projects/spring-kafka/issues/361将主题名称从 .yml 文件传递给@kafkalistener。
我有一个简单的 Spring Boot 应用程序,它从 Kafka 读取并写入 Kafka。我写了一个 SpringBootTest使用 EmbeddedKafka测试这一切。 主要问题是:有时测试失
我正在使用具有以下配置的 spring-kafka: package com.danigu.fancypants.infrastructure; import com.fasterxml.jackso
我正在使用 spring boot + spring @KafkaListener。我期望的行为是:我的 kafka 监听器在 10 个线程中读取消息。因此,如果其中一个线程挂起,其他消息将继续读取和
我正在编写一个 Kafka 消费者。我需要将环境变量主题名称传递给 @KafkaListener(topics = ...)。这是我到目前为止所尝试过的: import org.springfram
我真的很难编写一个测试来检查当消息发送到指定主题时我的 Kafka Consumer 是否被正确调用。 我的消费者: @Service @Slf4j @AllArgsConstructor(onCon
使用 Spring 云流的 Kafka 绑定(bind)的消息生产者 @Component public static class PageViewEventSource implements App
我在我的 spring boot 应用程序中使用了 @KafkaListener 注释,而没有创建自定义 KafkaListenerContainerFactory bean。我目前正在我的 appl
我为主题数组创建了一个 bean,在运行时我向这个主题数组添加了一些主题,但消费者没有更新主题并且仍然从主题数组中的第一个主题消费。我希望消费者添加这些新主题并开始消费 @Autowired priv
我正在尝试使用来自 kafka 服务器的数据,我正在使用 @Kafkalistener注解。 问题是每次重新启动应用程序时我都会收到所有消息。 如何在我的应用程序中保存上次使用的偏移量并使用它来使用下
我编写了一个 JUnit 测试用例来测试 Spring Kafka 文档中“使用 Java 配置”类(class)中的代码。 ( https://docs.spring.io/spring-kafka
来自 Twitter 的问题: 只是想找到一个使用 spring-kafka 2.1.7 的简单示例,该示例与 KafkaListener 和 AckMode.MANUAL_IMMEDIATE 一起使
在我的项目中,我有很多 spring 管理的组件,它们做同样的事情。我想创建一个通用的 Util 类,它可以为我的所有组件执行所有通用操作。由于这个 Util 类需要访问环境变量和 beans,它的实
我是一名优秀的程序员,十分优秀!