gpt4 book ai didi

java - 如何使用 Kafka 1.1.0 以编程方式创建主题

转载 作者:行者123 更新时间:2023-11-30 02:06:12 27 4
gpt4 key购买 nike

我最近升级到了 Kafka 1.1.0。我正在尝试为 kafka 消费者创建单元测试。为此,如果单元测试可以创建用于测试的主题,那将是理想的选择。我发现一些代码看起来应该可以实现我想要的功能。但是,当我运行它时,它会抛出异常: java.lang.NoSuchMethodError: org.apache.kafka.common.utils.Utils.closeQuietly(Ljava/lang/Au​​toCloseable;Ljava/lang/String;)V

这是我在网上找到的创建主题的代码:

@BeforeClass
public static void createTopic() {
try (final AdminClient adminClient = AdminClient.create(configure())) {
try {
// Define topic
NewTopic newTopic = new NewTopic("test-orders", 1, (short)1);

// Create topic, which is async call.
final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));

// Since the call is Async, Lets wait for it to complete.
createTopicsResult.values().get(ordersTopic).get();
} catch (InterruptedException | ExecutionException e) {
if (!(e.getCause() instanceof TopicExistsException)) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
}

但是当我运行它时它抛出异常。

java.lang.NoSuchMethodError: org.apache.kafka.common.utils.Utils.closeQuietly(Ljava/lang/AutoCloseable;Ljava/lang/String;)V
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:334)
at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:52)
at com.sial.notifications.topics.OrdersTopicsTests.createTopic(OrdersTopicsTests.java:162)

我传递给它的唯一配置参数是引导服务器和 client.id。我究竟做错了什么?看起来很简单

最佳答案

当我针对 1.1.0 代理单独运行时,这个稍微修改过的代码对我有用:

public static void main(String[] args) {
final String ordersTopic = "test-orders";
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

try (final AdminClient adminClient = AdminClient.create(props)) {
try {
// Define topic
NewTopic newTopic = new NewTopic(ordersTopic, 1, (short)1);

// Create topic, which is async call.
final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));

// Since the call is Async, Lets wait for it to complete.
createTopicsResult.values().get(ordersTopic).get();
} catch (InterruptedException | ExecutionException e) {
if (!(e.getCause() instanceof TopicExistsException))
throw new RuntimeException(e.getMessage(), e);
}
}
}

由于这与您的代码非常相似,并且根据您看到的错误,也许您还没有完全理清对 Kafka 库的依赖关系?我使用了 Maven 工件 org.apache.kafka:kafka_2.12:1.1.0

关于java - 如何使用 Kafka 1.1.0 以编程方式创建主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51274085/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com