gpt4 book ai didi

spring - Kafka Consumer 在 Spring Boot 中没有收到消息

转载 作者:行者123 更新时间:2023-12-04 13:58:46 24 4
gpt4 key购买 nike

我的 spring/java 消费者无法访问生产者产生的消息。但是,当我从控制台/终端运行消费者时,它能够接收由 spring/java 生产者生成的消息。

消费者配置:

@Component
@ConfigurationProperties(prefix="kafka.consumer")
public class KafkaConsumerProperties {

private String bootstrap;
private String group;
private String topic;

public String getBootstrap() {
return bootstrap;
}

public void setBootstrap(String bootstrap) {
this.bootstrap = bootstrap;
}

public String getGroup() {
return group;
}

public void setGroup(String group) {
this.group = group;
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}
}

监听器配置:
@Configuration
@EnableKafka
public class KafkaListenerConfig {

@Autowired
private KafkaConsumerProperties kafkaConsumerProperties;

@Bean
public Map<String, Object> getConsumerProperties() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrap());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroup());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
return properties;
}

@Bean
public Deserializer stringKeyDeserializer() {
return new StringDeserializer();
}

@Bean
public Deserializer transactionJsonValueDeserializer() {
return new JsonDeserializer(Transaction.class);
}

@Bean
public ConsumerFactory<String, Transaction> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(getConsumerProperties(), stringKeyDeserializer(), transactionJsonValueDeserializer());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Transaction> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Transaction> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory());
return factory;
}

}

卡夫卡听众:
@Service
public class TransactionConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(Transaction.class);

@KafkaListener(topics={"transactions"}, containerFactory = "kafkaListenerContainerFactory")
public void onReceive(Transaction transaction) {
LOGGER.info("transaction = {}",transaction);
}
}

消费者应用:
@SpringBootApplication
public class ConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}

}


测试案例 1:通过
我启动了我的 spring/java 生产者并从控制台运行消费者。当我从生产者中生成消息时,我的控制台消费者能够访问该消息。

测试案例 2:失败
我启动了我的 spring/java 消费者并从控制台运行生产者。当我从控制台生产者中生成消息时,我的 spring/java 消费者无法访问该消息。

测试案例 3:失败
我启动了我的 spring/java 消费者并运行了 spring/java 生产者。当我从 spring/java 生产者生成消息时,我的 spring/java 消费者无法访问该消息。

问题
  • 我的消费者代码有什么问题吗?
  • 我是否缺少 kafka-listener 的任何配置?
  • 我是否需要显式运行监听器? (我不这么认为,因为我可以在连接到主题的终端日志上看到,我仍然不确定)
  • 最佳答案

    好的,你不见了 AUTO_OFFSET_RESET_CONFIGConsumer Configs

    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    auto.offset.reset

    What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):



    最早:自动将偏移量重置为最早的偏移量

    最新:自动将偏移量重置为最新的偏移量

    无:如果没有找到消费者组的先前偏移量,则向消费者抛出异常

    其他:向消费者抛出异常

    注意: auto.offset.resetearliest仅当 kafka 没有该消费者组的偏移量时才会工作(因此,在您的情况下,需要使用新的消费者组添加此属性并重新启动应用程序)

    关于spring - Kafka Consumer 在 Spring Boot 中没有收到消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55620992/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com