gpt4 book ai didi

spring-boot - Kafka消费者动态获取主题

转载 作者:行者123 更新时间:2023-12-05 04:02:17 30 4
gpt4 key购买 nike

我在 Spring Boot 中配置了一个 Kafka 消费者。这是配置类:

@EnableKafka
@Configuration
@PropertySource({"classpath:kafka.properties"})
public class KafkaConsumerConfig {

@Autowired
private Environment env;

@Bean
public ConsumerFactory<String, GenericData.Record> consumerFactory() {

dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));
dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));
dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));
dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));
dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));

dataRiverProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, env.getProperty("schema.registry.url"));
dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());

return new DefaultKafkaConsumerFactory<>(dataRiverProps);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

这是消费者:

@Component
public class KafkaConsumer {

@Autowired
private MessageProcessor messageProcessor;

@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
public void consumeAvro(GenericData.Record message) {
messageProcessor.process();
}

}

请注意,我正在使用 topics = "#{'${kafka.topics}'.split(',')}" 从属性文件中选取主题。这就是我的 kafka.properties 文件的样子:

kafka.topics=pwdChange,pwdCreation
bootstrap.servers=aaa.bbb.com:37900
group.id=pwdManagement
enable.auto.commit=true
auto.commit.interval.ms=1000
session.timeout.ms=30000
schema.registry.url=http://aaa.bbb.com:37800

现在,如果我要向订阅添加一个新主题,比如 pwdExpire,并按如下方式修改 prop 文件:

kafka.topics=pwdChange,pwdCreation,pwdExpire

有没有办法让我的消费者在不重启服务器的情况下开始订阅这个新主题?我找到了这篇文章Spring Kafka - Subscribe new topics during runtime ,但文档对 metadata.max.age.ms 有这样的说法:

The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.

我觉得这行不通。感谢您的帮助!

最佳答案

没有;唯一的方法是使用主题模式;当添加新主题(与模式匹配)时,代理将在默认情况下在 5 分钟后将它们添加到订阅中。

但是,您可以在运行时为新主题添加新的监听器容器。

另一种选择是在子应用程序上下文中加载 @KafkaListener bean,并在每次主题更改时重新创建上下文。

编辑

请参阅 KafkaConsumer.subscribe(Pattern pattern) 的 javadocs...

/**
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
* <p>
...

关于spring-boot - Kafka消费者动态获取主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54466662/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com