gpt4 book ai didi

java - kafka AdminClient API 等待节点分配超时

转载 作者:行者123 更新时间:2023-12-01 08:39:22 24 4
gpt4 key购买 nike

我是 Kafka 的新手,正在尝试使用 AdminClient用于管理在我的本地机器上运行的 Kafka 服务器的 API。我的设置与 quick start 中的完全相同Kafka 文档的部分。唯一的区别是我没有创建任何主题。

我在此设置上运行任何 shell 脚本都没有问题,但是当我尝试运行以下 java 代码时:

public class ProducerMain{

public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");


try(final AdminClient adminClient =
KafkaAdminClient.create(props)){

try {
final NewTopic newTopic = new NewTopic("test", 1,
(short)1);

final CreateTopicsResult createTopicsResult =
adminClient.createTopics(
Collections.singleton(newTopic));

createTopicsResult.all().get();

}catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
}

错误: TimeoutException: Timed out waiting for a node assignment
Exception in thread "main" java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
at ProducerMain.main(ProducerMain.java:41)
<br>Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258)
at ProducerMain.main(ProducerMain.java:38)
<br>Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

我在网上搜索了有关问题可能是什么的指示,但到目前为止一无所获。任何建议都是受欢迎的,因为我已经走到了尽头。

最佳答案

听起来你的经纪人不健康......

这段代码工作正常

public class Main {

static final Logger logger = LoggerFactory.getLogger(Main.class);

public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, "local-test");
properties.setProperty(AdminClientConfig.RETRIES_CONFIG, "3");

try (AdminClient client = AdminClient.create(properties)) {
final CreateTopicsResult res = client.createTopics(
Collections.singletonList(
new NewTopic("foo", 1, (short) 1)
)
);
res.all().get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.error("unable to create topic", e);
}
}
}

我可以在代理日志中看到该主题已创建

关于java - kafka AdminClient API 等待节点分配超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51325076/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com