gpt4 book ai didi

java - 我们如何使用 API 从 IDE 在 Kafka 中创建主题

转载 作者:IT老高 更新时间:2023-10-28 20:32:58 24 4
gpt4 key购买 nike

我们如何使用 API 从 IDE 在 Kafka 中创建主题,因为当我这样做时:

bin/kafka-create-topic.sh --topic mytopic --replica 3 --zookeeper localhost:2181

我得到错误:

bash: bin/kafka-create-topic.sh: No such file or directory

我按照开发人员的设置进行操作。

最佳答案

在 Kafka 0.8.1+(截至目前最新版本的 Kafka)中,您可以通过 AdminCommand 以编程方式创建新主题。 CreateTopicCommand(旧版 Kafka 0.8.0 的一部分)的功能已移至 AdminCommand

Kafka 0.8.1 的 Scala 示例:

import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient

// Create a ZooKeeper client
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
// Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
// createTopic() will only seem to work (it will return without error). The topic will exist in
// only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
// topic.
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs, connectionTimeoutMs,
ZKStringSerializer)

// Create a topic named "myTopic" with 8 partitions and a replication factor of 3
val topicName = "myTopic"
val numPartitions = 8
val replicationFactor = 3
val topicConfig = new Properties
AdminUtils.createTopic(zkClient, topicName, numPartitions, replicationFactor, topicConfig)

构建依赖,以 sbt 为例:

libraryDependencies ++= Seq(
"com.101tec" % "zkclient" % "0.4",
"org.apache.kafka" % "kafka_2.10" % "0.8.1.1"
exclude("javax.jms", "jms")
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri"),
...
)

编辑:为 Kafka 0.9.0.0 添加了 Java 示例(截至 2016 年 1 月的最新版本)。

Maven 依赖:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.7</version>
</dependency>

代码:

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.util.Properties;

import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

public static void main(String[] args) {
String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
int sessionTimeoutMs = 10 * 1000;
int connectionTimeoutMs = 8 * 1000;
// Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
// createTopic() will only seem to work (it will return without error). The topic will exist in
// only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
// topic.
ZkClient zkClient = new ZkClient(
zookeeperConnect,
sessionTimeoutMs,
connectionTimeoutMs,
ZKStringSerializer$.MODULE$);

// Security for Kafka was added in Kafka 0.9.0.0
boolean isSecureKafkaCluster = false;
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);

String topic = "my-topic";
int partitions = 2;
int replication = 3;
Properties topicConfig = new Properties(); // add per-topic configurations settings here
AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);
zkClient.close();
}

}

编辑 2:为 Kafka 0.10.2.0 添加了 Java 示例(截至 2017 年 4 月的最新版本)。

Maven 依赖:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.9</version>
</dependency>

代码:

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.util.Properties;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

public static void main(String[] args) {
String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
int sessionTimeoutMs = 10 * 1000;
int connectionTimeoutMs = 8 * 1000;

String topic = "my-topic";
int partitions = 2;
int replication = 3;
Properties topicConfig = new Properties(); // add per-topic configurations settings here

// Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
// createTopic() will only seem to work (it will return without error). The topic will exist in
// only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
// topic.
ZkClient zkClient = new ZkClient(
zookeeperConnect,
sessionTimeoutMs,
connectionTimeoutMs,
ZKStringSerializer$.MODULE$);

// Security for Kafka was added in Kafka 0.9.0.0
boolean isSecureKafkaCluster = false;

ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
zkClient.close();
}

}

关于java - 我们如何使用 API 从 IDE 在 Kafka 中创建主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16946778/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com