gpt4 book ai didi

java - 如何通过 Java 在 Kafka 中创建主题

转载 作者:IT老高 更新时间:2023-10-28 20:53:30 25 4
gpt4 key购买 nike

我想通过java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题。如果我在命令提示符中创建一个主题,并且如果我通过 java api 推送消息,它工作正常。但我想通过java api创建一个主题。经过长时间的搜索,我找到了下面的代码,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

我尝试了上面的代码,它显示主题已创建,但我无法在主题中推送消息。我的代码有什么问题吗?或者任何其他方式来实现上述目标?

最佳答案

Edit - Zookeeper is not required in newer version of Kafka. Please see answer by @Neeleshkumar Srinivasan Mannur for API version 0.11.0+



原答案

我修好了..经过长时间的研究..

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

从上面的代码中,ZkClient 会创建一个主题,但是这个主题信息对于 kafka 是没有感知的。所以我们要做的是,我们需要通过以下方式为 ZkClient 创建对象,

首先导入下面的语句,

import kafka.utils.ZKStringSerializer$;

并通过以下方式为 ZkClient 创建对象,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

编辑 1:(用于@ajkret 评论)

The above code won't work for kafka > 0.9 since the api has been changed, Use the below code for kafka > 0.9


import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

public class KafkaTopicCreationInJava
{
public static void main(String[] args) throws Exception {
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs

zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

String topicName = "testTopic";
int noOfPartitions = 2;
int noOfReplication = 3;
Properties topicConfiguration = new Properties();

AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);

} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}
}

关于java - 如何通过 Java 在 Kafka 中创建主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27036923/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com