gpt4 book ai didi

java - 卡夫卡 : Alter number of partitions for a specific topic using java

转载 作者:行者123 更新时间:2023-11-29 06:54:24 24 4
gpt4 key购买 nike

我是 Kafka 的新手,正在使用新的 KafkaProducer 和 KafkaConsumer,版本:0.9.0.1

Java 中是否有任何方法可以在特定主题创建后更改/更新分区数。

我没有使用 zookeeper 创建主题。当发布请求到达时,我的 KafkaProducer 会自动创建主题。

如果这些还不够,我还可以提供更多细节

最佳答案

是的,这是可能的。您必须访问 kafka_2.11-0.9.0.1.jar 中的 AdminUtils scala 类才能添加分区。

AdminUtils 支持主题中的分区数只能增加。你可能需要 kafka_2.11-0.9.0.1.jar, zk-client-0.8.jar, scala-library-2.11.8.jar > 和 scala-parser-combinators_2.11-1.0.4.jar jar 在你的类路径中。

以下部分代码借鉴/启发自 kafka-cloudera 示例。

package org.apache.kafka.examples;

import java.io.Closeable;

import org.I0Itec.zkclient.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import kafka.admin.AdminOperationException;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode.Enforced$;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class Test {

static final Logger logger = LogManager.getLogger();

public Test() {
// TODO Auto-generated constructor stub
}

public static void addPartitions(String zkServers, String topic, int partitions) {

try (AutoZkClient zkClient = new AutoZkClient(zkServers)) {
ZkUtils zkUtils = ZkUtils.apply(zkClient, false);

if (AdminUtils.topicExists(zkUtils, topic)) {
logger.info("Altering topic {}", topic);
try {
AdminUtils.addPartitions(zkUtils, topic, partitions, "", true, Enforced$.MODULE$);
logger.info("Topic {} altered with partitions : {}", topic, partitions);
} catch (AdminOperationException aoe) {
logger.info("Error while altering partitions for topic : {}", topic, aoe);
}
} else {
logger.info("Topic {} doesn't exists", topic);
}
}
}

// Just exists for Closeable convenience
private static final class AutoZkClient extends ZkClient implements Closeable {

static int sessionTimeout = 30_000;
static int connectionTimeout = 6_000;

AutoZkClient(String zkServers) {
super(zkServers, sessionTimeout, connectionTimeout, ZKStringSerializer$.MODULE$);
}
}

public static void main(String[] args) {

addPartitions("localhost:2181", "hello", 20);
}
}

关于java - 卡夫卡 : Alter number of partitions for a specific topic using java,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37437301/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com