gpt4 book ai didi

java - @KafkaListener不接收记录

转载 作者:行者123 更新时间:2023-12-02 10:42:28 26 4
gpt4 key购买 nike

无法确定我的 kafka 监听器配置有什么问题。最初,我有一个名为“transactions”的非空 kafka 主题,其中有几条记录(我可以在 KafkaTool 中看到它)。这是application.yml:

spring:
###
# Kafka Settings
###
kafka:
consumer:
bootstrap-servers: localhost:9092
key-deserializer: com.panbet.externalbet.history.report.support.ReportsBetKeyJsonDeserializer
value-deserializer: com.panbet.externalbet.history.report.support.ReportsBetJsonDeserializer
group-id: external.history.group

这是 Java 配置文件:

@EnableKafka
@Configuration
public class KafkaConfig
{
private final KafkaProperties properties;


public KafkaConfig(KafkaProperties properties)
{
this.properties = properties;
}


@Bean
public ConsumerFactory<ReportsBetKeyDto, ReportsBetDto> kafkaConsumerFactory()
{
return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties());
}


@Bean
public ConcurrentKafkaListenerContainerFactory<ReportsBetKeyDto, ReportsBetDto> kafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<ReportsBetKeyDto, ReportsBetDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
return factory;
}
}

我的 kafka 监听器看起来像(前面提到的主题“交易”的监听器):

@Component
public class ReportsConsumer
{
@KafkaListener(topics = { "transactions" })
public void listen(ConsumerRecord<ReportsBetKeyDto, ReportsBetDto> record)
{
System.out.println(record);
}
}

我期望:当应用程序启动时,我将在 ReportsConsumer.listen 方法中捕获 debudder。但不幸的是,什么也没发生。 Listened 与 kafka 主题没有关联。有什么问题吗?谢谢。

最佳答案

默认情况下,新消费者从主题末尾开始消费。

设置spring.kafka.consumer.auto-offset-reset=earliest

我还建议 spring.kafka.consumer.enable-auto-commit=false 以便容器管理偏移量。

关于java - @KafkaListener不接收记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52834075/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com