gpt4 book ai didi

java - Spring Boot Kafka中多个消费者如何监听多个主题?

转载 作者:行者123 更新时间:2023-12-01 19:49:57 28 4
gpt4 key购买 nike

当有多个消费者时,我无法收听 kafka 主题(我的案例 2 主题)。在下面的示例中,我有 2 个消费者工厂,它们将接受 2 个不同的 JSON 消息(一个是用户类型,另一个是事件类型)。两条消息都发布到不同的主题。在这里,当我尝试访问 topic1 中的事件消息时,我无法访问,但可以访问用户主题消息。

例如:

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {
@Autowired
private Environment environment;

@Bean
public ConsumerFactory<String,User> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));
config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("user.consumer.group"));
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);

return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(User.class));

}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}

@Bean
public ConsumerFactory<String , Event> consumerFactoryEvent(){
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));
config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("event.consumer.group"));
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);

return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(Event.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactoryEvent() {
ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryEvent());
return factory;
}
}

我的主要应用程序如下:

@KafkaListener(topics = "${event.topic}")
public void processEvent(Event event) {
..do something..
..post the message to User topic
}
@KafkaListener(topics = "${user.topic}")
public void processUser(User user) {
..do something..
}

我的需要是首先监听事件主题并对消息进行一些处理,然后将其发送给用户主题,我有另一种方法可以监听用户主题并对该消息执行某些操作。我尝试将不同的选项传递给@KafkaListener,例如

@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactoryEvent")

但它不起作用..我不确定出了什么问题..任何建议都有帮助!

最佳答案

如果没有在bean中指定名称,则方法名称将为bean名称,在@KafkaListener中添加带有groupid的bean名称

@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactoryEvent", groupId="")

@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactory", groupId="")

@Bean中指定名称并将该名称添加到@kafkaListener

@Bean(name="kafkaListenerContainerFactoryEvent")
public ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactoryEvent() {
ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryEvent());
return factory;
}

关于java - Spring Boot Kafka中多个消费者如何监听多个主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51794710/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com