gpt4 book ai didi

java - 如何在使用 Spring 创建期间配置 kafka 主题保留策略?

转载 作者:行者123 更新时间:2023-12-03 12:06:12 25 4
gpt4 key购买 nike

我需要在创建期间配置特定主题的保留策略。我试图寻找解决方案,我只能找到如下命令级别的更改命令

./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config retention.ms=1680000



有人可以让我知道在创建过程中配置它的方法,例如 spring-mvc 中的 xml 或属性文件配置。

最佳答案

Spring Kafka 允许您通过声明 @Bean 来创建新主题s 在您的应用程序上下文中。这将需要一个类型为 KafkaAdmin 的 bean在应用程序上下文中,如果使用 Spring Boot,它将自动创建。您可以按如下方式定义主题:

@Bean
public NewTopic myTopic() {
return TopicBuilder.name("my-topic")
.partitions(4)
.replicas(3)
.config(TopicConfig.RETENTION_MS_CONFIG, "1680000")
.build();
}
如果您不使用 Spring Boot,则还必须定义 KafkaAdmin bean 角,扁 bean :
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
return new KafkaAdmin(configs);
}
如果要编辑现有主题的配置,则必须使用 AdminClient ,这是更改 retention.ms 的代码段在主题级别:
Map<String, Object> config = new HashMap<>();                
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

AdminClient client = AdminClient.create(config);

ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "new-topic");

// Update the retention.ms value
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1680000");
Map<ConfigResource, Config> updateConfig = new HashMap<>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));

AlterConfigOp op = new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>(1);
configs.put(resource, Arrays.asList(op));

AlterConfigsResult alterConfigsResult = client.incrementalAlterConfigs(configs);
alterConfigsResult.all();
可以使用此 @PostConstruct 自动设置配置接收 NewTopic 的方法 bean 。

@Autowired
private Set<NewTopic> topics;

@PostConstruct
public void reconfigureTopics() throws ExecutionException, InterruptedException {

try (final AdminClient adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers))) {
adminClient.incrementalAlterConfigs(topics.stream()
.filter(topic -> topic.configs() != null)
.collect(Collectors.toMap(
topic -> new ConfigResource(ConfigResource.Type.TOPIC, topic.name()),
topic -> topic.configs().entrySet()
.stream()
.map(e -> new ConfigEntry(e.getKey(), e.getValue()))
.peek(ce -> log.debug("configuring {} {} = {}", topic.name(), ce.name(), ce.value()))
.map(ce -> new AlterConfigOp(ce, AlterConfigOp.OpType.SET))
.collect(Collectors.toList())
)))
.all()
.get();
}

}

关于java - 如何在使用 Spring 创建期间配置 kafka 主题保留策略?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56539335/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com