gpt4 book ai didi

kafka.utils.ZkUtils类的使用及代码示例

转载 作者:知者 更新时间:2024-03-17 01:50:40 30 4
gpt4 key购买 nike

本文整理了Java中kafka.utils.ZkUtils类的一些代码示例,展示了ZkUtils类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkUtils类的具体详情如下:
包路径:kafka.utils.ZkUtils
类名称:ZkUtils

ZkUtils介绍

暂无

代码示例

代码示例来源:origin: apache/flink

public ZkUtils getZkUtils() {
  LOG.info("In getZKUtils:: zookeeperConnectionString = {}", zookeeperConnectionString);
  ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
      Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
  return ZkUtils.apply(creator, false);
}

代码示例来源:origin: apache/flink

@Override
public void deleteTestTopic(String topic) {
  ZkUtils zkUtils = getZkUtils();
  try {
    LOG.info("Deleting topic {}", topic);
    ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
      Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
    AdminUtils.deleteTopic(zkUtils, topic);
    zk.close();
  } finally {
    zkUtils.close();
  }
}

代码示例来源:origin: apache/incubator-gobblin

ZkClient zkClient = new ZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), false);
int partitions = ConfigUtils.getInt(config, KafkaWriterConfigurationKeys.PARTITION_COUNT, KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
int replication = ConfigUtils.getInt(config, KafkaWriterConfigurationKeys.REPLICATION_COUNT, KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
Properties topicConfig = new Properties(); 
if(AdminUtils.topicExists(zkUtils, topicName)) {
  log.debug("Topic"+topicName+" already Exists with replication: "+replication+" and partitions :"+partitions);
  return;
  AdminUtils.createTopic(zkUtils, topicName, partitions, replication, topicConfig);
} catch (RuntimeException e) {
  throw new RuntimeException(e);

代码示例来源:origin: OryxProject/oryx

/**
 * @param zkServers Zookeeper server string: host1:port1[,host2:port2,...]
 * @param topic topic to check for existence
 * @return {@code true} if and only if the given topic exists
 */
public static boolean topicExists(String zkServers, String topic) {
 ZkUtils zkUtils = ZkUtils.apply(zkServers, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, false);
 try {
  return AdminUtils.topicExists(zkUtils, topic);
 } finally {
  zkUtils.close();
 }
}

代码示例来源:origin: linkedin/kafka-monitor

/**
 * @param zkUrl zookeeper connection url
 * @return      number of brokers in this cluster
 */
public static int getBrokerCount(String zkUrl) {
 ZkUtils zkUtils = ZkUtils.apply(zkUrl, ZK_SESSION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, JaasUtils.isZkSecurityEnabled());
 try {
  return zkUtils.getAllBrokersInCluster().size();
 } finally {
  zkUtils.close();
 }
}

代码示例来源:origin: OryxProject/oryx

/**
 * @param zkServers Zookeeper server string: host1:port1[,host2:port2,...]
 * @param topic topic to delete, if it exists
 */
public static void deleteTopic(String zkServers, String topic) {
 ZkUtils zkUtils = ZkUtils.apply(zkServers, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, false);
 try {
  if (AdminUtils.topicExists(zkUtils, topic)) {
   log.info("Deleting topic {}", topic);
   AdminUtils.deleteTopic(zkUtils, topic);
   log.info("Deleted Zookeeper topic {}", topic);
  } else {
   log.info("No need to delete topic {} as it does not exist", topic);
  }
 } finally {
  zkUtils.close();
 }
}

代码示例来源:origin: confluentinc/kafka-streams-examples

/**
 * Creates and starts the cluster.
 */
public void start() throws Exception {
 log.debug("Initiating embedded Kafka cluster startup");
 log.debug("Starting a ZooKeeper instance...");
 zookeeper = new ZooKeeperEmbedded();
 log.debug("ZooKeeper instance is running at {}", zookeeper.connectString());
 zkUtils = ZkUtils.apply(
   zookeeper.connectString(),
   30000,
   30000,
   JaasUtils.isZkSecurityEnabled());
 Properties effectiveBrokerConfig = effectiveBrokerConfigFrom(brokerConfig, zookeeper);
 log.debug("Starting a Kafka instance on port {} ...",
   effectiveBrokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp()));
 broker = new KafkaEmbedded(effectiveBrokerConfig);
 log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}",
   broker.brokerList(), broker.zookeeperConnect());
 Properties schemaRegistryProps = new Properties();
 schemaRegistryProps.put(SchemaRegistryConfig.KAFKASTORE_TIMEOUT_CONFIG, KAFKASTORE_OPERATION_TIMEOUT_MS);
 schemaRegistryProps.put(SchemaRegistryConfig.DEBUG_CONFIG, KAFKASTORE_DEBUG);
 schemaRegistryProps.put(SchemaRegistryConfig.KAFKASTORE_INIT_TIMEOUT_CONFIG, KAFKASTORE_INIT_TIMEOUT);
 schemaRegistry = new RestApp(0, zookeeperConnect(), KAFKA_SCHEMAS_TOPIC, AVRO_COMPATIBILITY_TYPE, schemaRegistryProps);
 schemaRegistry.start();
 running = true;
}

代码示例来源:origin: apache/incubator-gobblin

Properties props = new Properties();
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
props.setProperty(KafkaWriterConfigurationKeys.REPLICATION_COUNT, topicReplicationCount);
props.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, topicPartitionCount );
props.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, liveZookeeper);
ZkClient zkClient = new ZkClient(
  liveZookeeper,
  sessionTimeoutMs,
  ZKStringSerializer$.MODULE$);
boolean isSecureKafkaCluster = false;
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(liveZookeeper), isSecureKafkaCluster);
    AdminUtils.fetchTopicMetadataFromZk(topic,zkUtils);
Assert.assertEquals(metaData.partitionsMetadata().size(), Integer.parseInt(topicPartitionCount));

代码示例来源:origin: apache/drill

public static void createTopicHelper(final String topicName, final int partitions) {
 Properties topicProps = new Properties();
 topicProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime");
 topicProps.put(TopicConfig.RETENTION_MS_CONFIG, "-1");
 ZkUtils zkUtils = new ZkUtils(zkClient,
   new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false);
 AdminUtils.createTopic(zkUtils, topicName, partitions, 1,
   topicProps, RackAwareMode.Disabled$.MODULE$);
 org.apache.kafka.common.requests.MetadataResponse.TopicMetadata fetchTopicMetadataFromZk =
   AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
 logger.info("Topic Metadata: " + fetchTopicMetadataFromZk);
}

代码示例来源:origin: apache/phoenix

@Before
public void setUp() throws IOException, SQLException {
  // setup Zookeeper
  zkServer = new EmbeddedZookeeper();
  String zkConnect = ZKHOST + ":" + zkServer.port();
  zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
  ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
  // setup Broker
  Properties brokerProps = new Properties();
  brokerProps.setProperty("zookeeper.connect", zkConnect);
  brokerProps.setProperty("broker.id", "0");
  brokerProps.setProperty("log.dirs",
    Files.createTempDirectory("kafka-").toAbsolutePath().toString());
  brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
  KafkaConfig config = new KafkaConfig(brokerProps);
  Time mock = new MockTime();
  kafkaServer = TestUtils.createServer(config, mock);
  kafkaServer.startup();
  // create topic
  AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties());
  pConsumer = new PhoenixConsumer();
  
  Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
  conn = DriverManager.getConnection(getUrl(), props);
}

代码示例来源:origin: OryxProject/oryx

/**
 * @param zkServers Zookeeper server string: host1:port1[,host2:port2,...]
 * @param topic topic to create (if not already existing)
 * @param partitions number of topic partitions
 * @param topicProperties optional topic config properties
 */
public static void maybeCreateTopic(String zkServers,
                  String topic,
                  int partitions,
                  Properties topicProperties) {
 ZkUtils zkUtils = ZkUtils.apply(zkServers, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, false);
 try {
  if (AdminUtils.topicExists(zkUtils, topic)) {
   log.info("No need to create topic {} as it already exists", topic);
  } else {
   log.info("Creating topic {} with {} partition(s)", topic, partitions);
   try {
    AdminUtils.createTopic(
      zkUtils, topic, partitions, 1, topicProperties, RackAwareMode.Enforced$.MODULE$);
    log.info("Created topic {}", topic);
   } catch (TopicExistsException re) {
    log.info("Topic {} already exists", topic);
   }
  }
 } finally {
  zkUtils.close();
 }
}

代码示例来源:origin: com.hotels.road/road-kafka-store

@SuppressWarnings({ "rawtypes", "unchecked" })
 private static void verifyTopic(ZkUtils zkUtils, String topic) {
  Set topics = new HashSet();
  topics.add(topic);

  // check # partition and the replication factor
  scala.collection.mutable.Map partitionAssignmentForTopics = zkUtils
    .getPartitionAssignmentForTopics(JavaConversions.asScalaSet(topics).toSeq());
  scala.collection.Map partitionAssignment = (scala.collection.Map) partitionAssignmentForTopics.get(topic).get();

  if (partitionAssignment.size() != 1) {
   throw new RuntimeException(String.format("The schema topic %s should have only 1 partition.", topic));
  }

  // check the retention policy
  Properties prop = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic);
  String retentionPolicy = prop.getProperty(LogConfig.CleanupPolicyProp());
  if (retentionPolicy == null || "compact".compareTo(retentionPolicy) != 0) {
   throw new RuntimeException(String.format("The retention policy of the schema topic %s must be compact.", topic));
  }
 }
}

代码示例来源:origin: apache/incubator-gobblin

public void provisionTopic(String topic) {
 if (_topicConsumerMap.containsKey(topic)) {
  // nothing to do: return
 } else {
  // provision topic
  AdminUtils.createTopic(ZkUtils.apply(_kafkaServerSuite.getZkClient(), false),
    topic, 1, 1, new Properties());
  List<KafkaServer> servers = new ArrayList<>();
  servers.add(_kafkaServerSuite.getKafkaServer());
  kafka.utils.TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000);
  KafkaConsumerSuite consumerSuite = new KafkaConsumerSuite(_kafkaServerSuite.getZkConnectString(), topic);
  _topicConsumerMap.put(topic, consumerSuite);
 }
}

代码示例来源:origin: apache/tajo

public void createTopic(int partitions, int replication, String topic) {
 checkState(started.get(), "not started!");
 ZkClient zkClient = new ZkClient(getZookeeperConnectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
 try {
  AdminUtils.createTopic(ZkUtils.apply(zkClient, false), topic, partitions, replication, new Properties(),
    RackAwareMode.Enforced$.MODULE$);
 } finally {
  zkClient.close();
 }
}

代码示例来源:origin: reactor/reactor-kafka

public String createNewTopic(String newTopic, int partitions) {
  ZkUtils zkUtils = new ZkUtils(embeddedKafka.zkClient(), null, false);
  Properties props = new Properties();
  AdminUtils.createTopic(zkUtils, newTopic, partitions, 1, props, null);
  waitForTopic(newTopic, partitions, true);
  return newTopic;
}

代码示例来源:origin: homeaway/stream-registry

private ZkUtils initZkUtils(Properties config) {
  String zkConnect = config.getProperty(KafkaProducerConfig.ZOOKEEPER_QUORUM);
  ZkClient zkClient = new ZkClient(zkConnect);
  zkClient.setZkSerializer(ZKStringSerializer$.MODULE$);
  ZkConnection zkConnection = new ZkConnection(zkConnect);
  ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
  return zkUtils;
}

代码示例来源:origin: linkedin/kafka-monitor

ZkUtils zkUtils = ZkUtils.apply(zkUrl, ZK_SESSION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, JaasUtils.isZkSecurityEnabled());
try {
 if (AdminUtils.topicExists(zkUtils, topic)) {
  return getPartitionNumForTopic(zkUrl, topic);
 int brokerCount = zkUtils.getAllBrokersInCluster().size();
 int partitionCount = Math.max((int) Math.ceil(brokerCount * partitionToBrokerRatio), minPartitionNum);
  AdminUtils.createTopic(zkUtils, topic, partitionCount, replicationFactor, topicConfig, RackAwareMode.Enforced$.MODULE$);
 } catch (TopicExistsException e) {
  + topicConfig.get(KafkaConfig.MinInSyncReplicasProp()) + " and replication factor of " + replicationFactor + ".");
 zkUtils.close();

代码示例来源:origin: org.apache.kafka/kafka_2.10

public int run(final String[] args, final Properties config) {
  consumerConfig.clear();
  consumerConfig.putAll(config);
    zkUtils = ZkUtils.apply(options.valueOf(zookeeperOption),
      30000,
      30000,
    allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
      zkUtils.close();

代码示例来源:origin: org.apache.apex/malhar-contrib

/**
 * There is always only one string in zkHost
 * @param zkHost
 * @return
 */
public static Set<String> getBrokers(Set<String> zkHost){
 ZkClient zkclient = new ZkClient(zkHost.iterator().next(), 30000, 30000, ZKStringSerializer$.MODULE$);
 Set<String> brokerHosts = new HashSet<String>();
 for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) {
  brokerHosts.add(b.connectionString());
 }
 zkclient.close();
 return brokerHosts;
}

代码示例来源:origin: linkedin/cruise-control

public BrokerFailureDetector(KafkaCruiseControlConfig config,
               LoadMonitor loadMonitor,
               Queue<Anomaly> anomalies,
               Time time,
               KafkaCruiseControl kafkaCruiseControl) {
 String zkUrl = config.getString(KafkaCruiseControlConfig.ZOOKEEPER_CONNECT_CONFIG);
 ZkConnection zkConnection = new ZkConnection(zkUrl, 30000);
 _zkClient = new ZkClient(zkConnection, 30000, new ZkStringSerializer());
 // Do not support secure ZK at this point.
 _zkUtils = new ZkUtils(_zkClient, zkConnection, false);
 _failedBrokers = new HashMap<>();
 _failedBrokersZkPath = config.getString(KafkaCruiseControlConfig.FAILED_BROKERS_ZK_PATH_CONFIG);
 _loadMonitor = loadMonitor;
 _anomalies = anomalies;
 _time = time;
 _kafkaCruiseControl = kafkaCruiseControl;
 _allowCapacityEstimation = config.getBoolean(KafkaCruiseControlConfig.ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG);
}

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com