gpt4 book ai didi

java - 我如何使用 Kafka Consumer 的动态主题

转载 作者:行者123 更新时间:2023-12-01 21:57:29 34 4
gpt4 key购买 nike

下面给出 Kafka 的配置来定位服务器 @ bean 公共(public) ConsumerFactory ConsumerFactory() { 映射配置 = new HashMap<>();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

return new DefaultKafkaConsumerFactory<>(config);
}


@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}



here I need to change the topic as a dynamic content
@KafkaListener(topics = "mytopic")
private void consume2(String Data) {
String userID = null;
String topic = null;
String message = null;
System.out.println(Data);
}

最佳答案

你有没有尝试过类似的事情

@KafkaListener(topics = "${kafka.topics}")

并将其设置为环境变量或应用程序属性?

环境变量KAFKA_TOPICS=mytopicapplication.properties kafka.topics=mytopic

关于java - 我如何使用 Kafka Consumer 的动态主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58732155/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com