gpt4 book ai didi

kafka.utils.ZKGroupTopicDirs.consumerOffsetDir()方法的使用及代码示例

转载 作者:知者 更新时间:2024-03-15 04:19:31 26 4
gpt4 key购买 nike

本文整理了Java中kafka.utils.ZKGroupTopicDirs.consumerOffsetDir()方法的一些代码示例,展示了ZKGroupTopicDirs.consumerOffsetDir()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZKGroupTopicDirs.consumerOffsetDir()方法的具体详情如下:
包路径:kafka.utils.ZKGroupTopicDirs
类名称:ZKGroupTopicDirs
方法名:consumerOffsetDir

ZKGroupTopicDirs.consumerOffsetDir介绍

暂无

代码示例

代码示例来源:origin: apache/flink

public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
    ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
    String path = topicDirs.consumerOffsetDir() + "/" + partition;
    curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());

    byte[] data = curatorClient.getData().forPath(path);

    if (data == null) {
      return null;
    } else {
      String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
      if (asString.length() == 0) {
        return null;
      } else {
        try {
          return Long.valueOf(asString);
        }
        catch (NumberFormatException e) {
          LOG.error(
              "The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
            groupId, topic, partition, asString);
          return null;
        }
      }
    }
  }
}

代码示例来源:origin: apache/flink

public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
  ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
  String path = topicDirs.consumerOffsetDir() + "/" + partition;
  curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
  byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
  curatorClient.setData().forPath(path, data);
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11

public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
  ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
  String path = topicDirs.consumerOffsetDir() + "/" + partition;
  curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
  byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
  curatorClient.setData().forPath(path, data);
}

代码示例来源:origin: org.opendaylight.centinel/centinel-laas

/**
 * @param zookeeperHosts
 *            Zookeeper hosts e.g. localhost:2181. If multiple zookeeper
 *            then host1:port1[,host2:port2,...]
 * @param groupID
 *            consumer group to update
 * @param offsets
 *            mapping of (topic and) partition to offset to push to
 *            Zookeeper
 */
public void createOffsets(String zookeeperHosts, String groupID, Map<TopicAndPartition, Long> offsets) {
  try (SuperZkClient zkClient = new SuperZkClient(zookeeperHosts)) {
    for (Map.Entry<TopicAndPartition, Long> entry : offsets.entrySet()) {
      TopicAndPartition topicAndPartition = entry.getKey();
      ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topicAndPartition.topic());
      int partition = topicAndPartition.partition();
      long offset = entry.getValue();
      String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
      ZkUtils.updatePersistentPath(zkClient, partitionOffsetPath, Long.toString(offset));
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8

public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
  ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
  String path = topicDirs.consumerOffsetDir() + "/" + partition;
  curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
  byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
  curatorClient.setData().forPath(path, data);
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10

public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
  ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
  String path = topicDirs.consumerOffsetDir() + "/" + partition;
  curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
  byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
  curatorClient.setData().forPath(path, data);
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8

public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
    ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
    String path = topicDirs.consumerOffsetDir() + "/" + partition;
    curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());

    byte[] data = curatorClient.getData().forPath(path);

    if (data == null) {
      return null;
    } else {
      String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
      if (asString.length() == 0) {
        return null;
      } else {
        try {
          return Long.valueOf(asString);
        }
        catch (NumberFormatException e) {
          LOG.error(
              "The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
            groupId, topic, partition, asString);
          return null;
        }
      }
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10

public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
    ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
    String path = topicDirs.consumerOffsetDir() + "/" + partition;
    curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
    
    byte[] data = curatorClient.getData().forPath(path);
    
    if (data == null) {
      return null;
    } else {
      String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
      if (asString.length() == 0) {
        return null;
      } else {
        try {
          return Long.valueOf(asString);
        }
        catch (NumberFormatException e) {
          LOG.error(
              "The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
            groupId, topic, partition, asString);
          return null;
        }
      }
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11

public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
    ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
    String path = topicDirs.consumerOffsetDir() + "/" + partition;
    curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());

    byte[] data = curatorClient.getData().forPath(path);

    if (data == null) {
      return null;
    } else {
      String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
      if (asString.length() == 0) {
        return null;
      } else {
        try {
          return Long.valueOf(asString);
        }
        catch (NumberFormatException e) {
          LOG.error(
              "The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
            groupId, topic, partition, asString);
          return null;
        }
      }
    }
  }
}

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com