gpt4 book ai didi

java - 如何在 Java 中创建 Kafka ZKStringSerializer?

转载 作者:搜寻专家 更新时间:2023-10-31 08:11:57 26 4
gpt4 key购买 nike

在搜索如何通过 API 创建 Kafka 主题时,我在 Scala 中找到了这个示例:

import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient

// Create a ZooKeeper client
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs,
connectionTimeoutMs, ZKStringSerializer)

// Create a topic with 8 partitions and a replication factor of 3
val topicName = "myTopic"
val numPartitions = 8
val replicationFactor = 3
val topicConfig = new Properties
AdminUtils.createTopic(zkClient, topicName,
numPartitions, replicationFactor, topicConfig)

来源:https://stackoverflow.com/a/23360100/871012

最后一个参数 ZKStringSerializer 显然是一个 Scala 对象。我不清楚如何使这个示例在 Java 中运行。

此帖How to create a scala object in clojure在 Clojure 中问同样的问题,答案是:

ZKStringSerializer$/MODULE$

在 Java 中(我认为)会转化为:

ZKStringSerializer$.MODULE$

但是当我尝试这样做(或任何其他变体)时,它们都无法编译。
编译错误为:

KafkaTopicCreator.java:[16,18] cannot find symbol
symbol: variable ZKStringSerializer$
location: class org.sample.KafkaTopicCreator

我正在使用 kafka_2.9.2-0.8.1.1 和 Java 8。

最佳答案

对于 java 尝试以下,

先导入下面的语句

import kafka.utils.ZKStringSerializer$;

通过以下方式为 ZkClient 创建对象,

String zkHosts = "127.0.0.1:2181"; //If more than one zookeeper then "127.0.0.1:2181,127.0.0.2:2181"
ZkClient zkClient = new ZkClient(zkHosts, 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

The above code won't work for kafka > 0.9 since the api has been changed, Use the below code for kafka > 0.9

import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

public class KafkaTopicCreationInJava
{
public static void main(String[] args) throws Exception {
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs

zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

String topicName = "testTopic";
int noOfPartitions = 2;
int noOfReplication = 3;
Properties topicConfiguration = new Properties();

AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);

} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}
}

关于java - 如何在 Java 中创建 Kafka ZKStringSerializer?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27006156/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com