- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
本文整理了Java中kafka.utils.ZKGroupTopicDirs.consumerOffsetDir()
方法的一些代码示例,展示了ZKGroupTopicDirs.consumerOffsetDir()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZKGroupTopicDirs.consumerOffsetDir()
方法的具体详情如下:
包路径:kafka.utils.ZKGroupTopicDirs
类名称:ZKGroupTopicDirs
方法名:consumerOffsetDir
暂无
代码示例来源:origin: apache/flink
public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
byte[] data = curatorClient.getData().forPath(path);
if (data == null) {
return null;
} else {
String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
if (asString.length() == 0) {
return null;
} else {
try {
return Long.valueOf(asString);
}
catch (NumberFormatException e) {
LOG.error(
"The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
groupId, topic, partition, asString);
return null;
}
}
}
}
}
代码示例来源:origin: apache/flink
public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
curatorClient.setData().forPath(path, data);
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11
public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
curatorClient.setData().forPath(path, data);
}
代码示例来源:origin: org.opendaylight.centinel/centinel-laas
/**
* @param zookeeperHosts
* Zookeeper hosts e.g. localhost:2181. If multiple zookeeper
* then host1:port1[,host2:port2,...]
* @param groupID
* consumer group to update
* @param offsets
* mapping of (topic and) partition to offset to push to
* Zookeeper
*/
public void createOffsets(String zookeeperHosts, String groupID, Map<TopicAndPartition, Long> offsets) {
try (SuperZkClient zkClient = new SuperZkClient(zookeeperHosts)) {
for (Map.Entry<TopicAndPartition, Long> entry : offsets.entrySet()) {
TopicAndPartition topicAndPartition = entry.getKey();
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topicAndPartition.topic());
int partition = topicAndPartition.partition();
long offset = entry.getValue();
String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
ZkUtils.updatePersistentPath(zkClient, partitionOffsetPath, Long.toString(offset));
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8
public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
curatorClient.setData().forPath(path, data);
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10
public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
curatorClient.setData().forPath(path, data);
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8
public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
byte[] data = curatorClient.getData().forPath(path);
if (data == null) {
return null;
} else {
String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
if (asString.length() == 0) {
return null;
} else {
try {
return Long.valueOf(asString);
}
catch (NumberFormatException e) {
LOG.error(
"The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
groupId, topic, partition, asString);
return null;
}
}
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10
public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
byte[] data = curatorClient.getData().forPath(path);
if (data == null) {
return null;
} else {
String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
if (asString.length() == 0) {
return null;
} else {
try {
return Long.valueOf(asString);
}
catch (NumberFormatException e) {
LOG.error(
"The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
groupId, topic, partition, asString);
return null;
}
}
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11
public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
byte[] data = curatorClient.getData().forPath(path);
if (data == null) {
return null;
} else {
String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
if (asString.length() == 0) {
return null;
} else {
try {
return Long.valueOf(asString);
}
catch (NumberFormatException e) {
LOG.error(
"The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
groupId, topic, partition, asString);
return null;
}
}
}
}
}
本文整理了Java中kafka.utils.ZKGroupTopicDirs.consumerOffsetDir()方法的一些代码示例,展示了ZKGroupTopicDirs.consumerOffs
本文整理了Java中kafka.utils.ZKGroupTopicDirs.()方法的一些代码示例,展示了ZKGroupTopicDirs.()的具体用法。这些代码示例主要来源于Github/Sta
我是一名优秀的程序员,十分优秀!