gpt4 book ai didi

spring-boot - 以编程方式配置监听器而不是使用注释

转载 作者:行者123 更新时间:2023-12-03 08:53:11 25 4
gpt4 key购买 nike

我已经read the documentation about receiving messages :

You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation.

但是我无法让它正常工作。我正在使用 Spring Boot 2.1.2,并且可能在 Spring Soup with Beans 中加盐过多,这对我来说弊大于利,所以我想了解它应该如何工作,这样我可以检查我偏离了哪里荣耀之路。

如果我正确理解文档,拥有 MessageListenerContainer 就足够了配置,例如这里:

@Configuration
public class MyKafkaConfiguration {
@Bean
public MessageListenerContainer myVeryOwnListener(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties cProps = new ContainerProperties(new TopicPartitionInitialOffset("spring-kafka-stackoverflow-questions", /* partition */ 0, /* Offset */ 0L));
KafkaMessageListenerContainer<String, String> result = new KafkaMessageListenerContainer<>(consumerFactory, cProps);
result.setupMessageListener(MessageListener<String, String) System.out::println);
return result;
}
}

这会毫无异常(exception)地启动,但似乎并没有真正监听代理上的任何消息。根据我从通常的注释流程中得到的信息,需要有人以 KafkaListenerEndpoint 的形式注册监听器。到KafkaListenerEndpointRegistry .

这是由 KafkaListenerAnnotationBeanPostPorcessor 自动完成的对于 @KafkaListener 注释的所有方法但这在我想走这条路的情况下应该如何工作

by configuring a MessageListenerContainer and providing a message listener

而不是

using the @KafkaListener annotation

我不太明白。 KafkaAutoConfiguration 中也没有方法(Spring Boot 提供),例如需要 List<MessageListenerContainer>并自动将它们全部注册到注册表中,因此这并不奇怪。

但是,正如文档所示,它最初应该如何工作?我误解了那部分吗?有人可以告诉我吗?

最佳答案

我刚刚将您的 bean 复制到新的启动应用程序中,它运行得很好。

端点注册表仅适用于@KafkaListener容器,因为它们没有在应用程序上下文中注册为bean(注册表就是bean)。


@SpringBootApplication
public class So57628247Application {

private static final int MessageListener = 0;

public static void main(String[] args) {
SpringApplication.run(So57628247Application.class, args);
}

@Bean
public MessageListenerContainer myVeryOwnListener(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties cProps = new ContainerProperties(new TopicPartitionInitialOffset(
"spring-kafka-stackoverflow-questions", /* partition */ 0, /* Offset */ 0L));
KafkaMessageListenerContainer<String, String> result = new KafkaMessageListenerContainer<>(consumerFactory,
cProps);
result.setupMessageListener((MessageListener<String, String>) System.out::println);
return result;
}

@Bean
public NewTopic topic() {
return new NewTopic("spring-kafka-stackoverflow-questions", 1, (short) 1);
}


@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("spring-kafka-stackoverflow-questions", "foo");
};
}

}

ConsumerRecord(topic = spring-kafka-stackoverflow-questions, partition = 0, offset = 0, CreateTime = 1566573407373, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)

关于spring-boot - 以编程方式配置监听器而不是使用注释,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57628247/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com