gpt4 book ai didi

java - 无法使用 Kafka 管理客户端 API 创建具有所需分区的 kafka 主题

转载 作者:行者123 更新时间:2023-12-01 18:27:34 25 4
gpt4 key购买 nike

我正在使用 Kafka 管理客户端 API 来创建主题。正在创建主题,但默认情况下使用 1 个分区创建主题。 API 不尊重所提供的可配置值。不确定我是否正确使用它。

注意:主题创建在代理级别启用。此外,该主题正在创建,但它是使用分区 1 创建的。

NewTopic newTopic = new NewTopic(TOPIC_NAME, 10, (short) 1);
CreateTopicsResult createTopicsResult = null;
try {
createTopicsResult = KafkaAdminClient.create(getAdminProperties()).createTopics(Collections.singletonList(newTopic));
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

但是,我可以使用 Kafka 管理客户端 API 增加早期创建的主题的分区

最佳答案

我尝试使用以下代码重现此问题,但没有成功:

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class AdminApiDemo {

private static final String BOOTSRAP_SERVER = "localhost:9092";
private static final String TOPIC_NAME = "demoTopic";
private static final int NUM_PARTITIONS = 3;
private static final short NUM_REPLICAS = 1;

private final AdminClient adminClient;

private AdminApiDemo(Properties properties) {
this.adminClient = KafkaAdminClient.create(properties);
}

public static void main(String[] args) {
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSRAP_SERVER);

new AdminApiDemo(properties).createTopic(TOPIC_NAME, NUM_PARTITIONS, NUM_REPLICAS);
}

private void createTopic(String topicName, int numPartitions, short numReplicas) {
try {
final NewTopic newTopic = new NewTopic(topicName, numPartitions, numReplicas);
final CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));
result.values().get(topicName).get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
}
}

kafka-topics --describe 显示以下内容:

root@kafka:/# kafka-topics --bootstrap-server localhost:9092 --describe --topic demoTopic
Topic:demoTopic PartitionCount:3 ReplicationFactor:1 Configs:
Topic: demoTopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: demoTopic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: demoTopic Partition: 2 Leader: 1 Replicas: 1 Isr: 1

我想,好吧,如果主题在创建之前可能存在怎么办,但我又得到了一个 java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'demoTopic'已经存在。,所以这也不可能是你的情况。

我知道这不是“真正的”答案,它可以解决任何问题,对此感到抱歉。但无论如何,我希望它能有所帮助。也许其他人可以使用它在他的设置中重现它并“看到”问题。

关于java - 无法使用 Kafka 管理客户端 API 创建具有所需分区的 kafka 主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60209138/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com