gpt4 book ai didi

com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils类的使用及代码示例

转载 作者:知者 更新时间:2024-03-15 19:27:31 25 4
gpt4 key购买 nike

本文整理了Java中com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils类的一些代码示例,展示了ZookeeperPathUtils类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZookeeperPathUtils类的具体详情如下:
包路径:com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils
类名称:ZookeeperPathUtils

ZookeeperPathUtils介绍

[英]存储结构:

/otter 
canal 
cluster 
destinations 
dest1 
running (EPHEMERAL)  
cluster 
client1 
running (EPHEMERAL) 
cluster 
filter 
cursor 
mark 
1 
2 
3

[中]存储结构:

/otter 
canal 
cluster 
destinations 
dest1 
running (EPHEMERAL)  
cluster 
client1 
running (EPHEMERAL) 
cluster 
filter 
cursor 
mark 
1 
2 
3

代码示例

代码示例来源:origin: alibaba/canal

public boolean hasSubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
  String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
    clientIdentity.getClientId());
  return zkClientx.exists(path);
}

代码示例来源:origin: alibaba/canal

return new ArrayList<ClientIdentity>();
String path = ZookeeperPathUtils.getDestinationPath(destination);
List<String> childs = null;
try {
for (String child : childs) {
  if (StringUtils.isNumeric(child)) {
    clientIds.add(ZookeeperPathUtils.getClientId(child));
  path = ZookeeperPathUtils.getFilterPath(destination, clientId);
  byte[] bytes = zkClientx.readData(path, true);
  String filter = null;

代码示例来源:origin: alibaba/canal

private boolean releaseRunning() {
  if (check()) {
    String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
    zkClient.delete(path);
    mutex.set(false);
    processActiveExit();
    return true;
  }
  return false;
}

代码示例来源:origin: alibaba/canal

public Long addBatch(ClientIdentity clientIdentity, PositionRange positionRange) throws CanalMetaManagerException {
  String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
    clientIdentity.getClientId());
  byte[] data = JsonUtils.marshalToByte(positionRange, SerializerFeature.WriteClassName);
  String batchPath = zkClientx
    .createPersistentSequential(path + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR, data, true);
  String batchIdString = StringUtils.substringAfterLast(batchPath, ZookeeperPathUtils.ZOOKEEPER_SEPARATOR);
  return ZookeeperPathUtils.getBatchMarkId(batchIdString);
}

代码示例来源:origin: alibaba/canal

public PositionRange removeBatch(ClientIdentity clientIdentity, Long batchId) throws CanalMetaManagerException {
  String batchsPath = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
    clientIdentity.getClientId());
  List<String> nodes = zkClientx.getChildren(batchsPath);
  if (CollectionUtils.isEmpty(nodes)) {
    // 没有batch记录
    return null;
  }
  // 找到最小的Id
  ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
  for (String batchIdString : nodes) {
    batchIds.add(Long.valueOf(batchIdString));
  }
  Long minBatchId = Collections.min(batchIds);
  if (!minBatchId.equals(batchId)) {
    // 检查一下提交的ack/rollback,必须按batchId分出去的顺序提交,否则容易出现丢数据
    throw new CanalMetaManagerException(String.format("batchId:%d is not the firstly:%d", batchId, minBatchId));
  }
  if (!batchIds.contains(batchId)) {
    // 不存在对应的batchId
    return null;
  }
  PositionRange positionRange = getBatch(clientIdentity, batchId);
  if (positionRange != null) {
    String path = ZookeeperPathUtils
      .getBatchMarkWithIdPath(clientIdentity.getDestination(), clientIdentity.getClientId(), batchId);
    zkClientx.delete(path);
  }
  return positionRange;
}

代码示例来源:origin: alibaba/canal

private void initRunning() {
  if (!isStart()) {
    return;
  }
  String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
  // 序列化
  byte[] bytes = JsonUtils.marshalToByte(serverData);
  try {
    mutex.set(false);
    zkClient.create(path, bytes, CreateMode.EPHEMERAL);
    activeData = serverData;
    processActiveEnter();// 触发一下事件
    mutex.set(true);
  } catch (ZkNodeExistsException e) {
    bytes = zkClient.readData(path, true);
    if (bytes == null) {// 如果不存在节点,立即尝试一次
      initRunning();
    } else {
      activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
    }
  } catch (ZkNoNodeException e) {
    zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
    initRunning();
  }
}

代码示例来源:origin: alibaba/canal

public ClusterNodeAccessStrategy(String destination, ZkClientx zkClient){
  this.destination = destination;
  this.zkClient = zkClient;
  childListener = new IZkChildListener() {
    public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
      initClusters(currentChilds);
    }
  };
  dataListener = new IZkDataListener() {
    public void handleDataDeleted(String dataPath) throws Exception {
      runningAddress = null;
    }
    public void handleDataChange(String dataPath, Object data) throws Exception {
      initRunning(data);
    }
  };
  String clusterPath = ZookeeperPathUtils.getDestinationClusterRoot(destination);
  this.zkClient.subscribeChildChanges(clusterPath, childListener);
  initClusters(this.zkClient.getChildren(clusterPath));
  String runningPath = ZookeeperPathUtils.getDestinationServerRunning(destination);
  this.zkClient.subscribeDataChanges(runningPath, dataListener);
  initRunning(this.zkClient.readData(runningPath, true));
}

代码示例来源:origin: alibaba/canal

String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
  zkClient.createPersistent(ZookeeperPathUtils.getClientIdNodePath(this.destination, clientData.getClientId()),
    true); // 尝试创建父节点
  initRunning();

代码示例来源:origin: alibaba/canal

public void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
  String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
    clientIdentity.getClientId());
  try {
    zkClientx.createPersistent(path, true);
  } catch (ZkNodeExistsException e) {
    // ignore
  }
  if (clientIdentity.hasFilter()) {
    String filterPath = ZookeeperPathUtils.getFilterPath(clientIdentity.getDestination(),
      clientIdentity.getClientId());
    byte[] bytes = null;
    try {
      bytes = clientIdentity.getFilter().getBytes(ENCODE);
    } catch (UnsupportedEncodingException e) {
      throw new CanalMetaManagerException(e);
    }
    try {
      zkClientx.createPersistent(filterPath, bytes);
    } catch (ZkNodeExistsException e) {
      // ignore
      zkClientx.writeData(filterPath, bytes);
    }
  }
}

代码示例来源:origin: alibaba/canal

public Map<Long, PositionRange> listAllBatchs(ClientIdentity clientIdentity) {
  String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
    clientIdentity.getClientId());
  List<String> nodes = null;
  try {
    nodes = zkClientx.getChildren(path);
  } catch (ZkNoNodeException e) {
    // ignore
  }
  if (CollectionUtils.isEmpty(nodes)) {
    return Maps.newHashMap();
  }
  // 找到最大的Id
  ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
  for (String batchIdString : nodes) {
    batchIds.add(Long.valueOf(batchIdString));
  }
  Collections.sort(batchIds); // 从小到大排序
  Map<Long, PositionRange> positionRanges = Maps.newLinkedHashMap();
  for (Long batchId : batchIds) {
    PositionRange result = getBatch(clientIdentity, batchId);
    if (result == null) {// 出现为null,说明zk节点有变化,重新获取
      return listAllBatchs(clientIdentity);
    } else {
      positionRanges.put(batchId, result);
    }
  }
  return positionRanges;
}

代码示例来源:origin: alibaba/canal

releaseCid(ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port));
logger.info("## stop the canal server[{}:{}]", ip, port);

代码示例来源:origin: alibaba/canal

public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {
  String path = ZookeeperPathUtils.getCursorPath(clientIdentity.getDestination(), clientIdentity.getClientId());
  byte[] data = zkClientx.readData(path, true);
  if (data == null || data.length == 0) {
    return null;
  }
  return JsonUtils.unmarshalFromByte(data, Position.class);
}

代码示例来源:origin: alibaba/canal

public void addBatch(ClientIdentity clientIdentity, PositionRange positionRange,
           Long batchId) throws CanalMetaManagerException {
  String path = ZookeeperPathUtils
    .getBatchMarkWithIdPath(clientIdentity.getDestination(), clientIdentity.getClientId(), batchId);
  byte[] data = JsonUtils.marshalToByte(positionRange, SerializerFeature.WriteClassName);
  zkClientx.createPersistent(path, data, true);
}

代码示例来源:origin: alibaba/canal

public static String getBatchMarkWithIdPath(String destinationName, short clientId, Long batchId) {
  return MessageFormat.format(DESTINATION_CLIENTID_BATCH_MARK_WITH_ID_PATH,
    destinationName,
    String.valueOf(clientId),
    getBatchMarkNode(batchId));
}

代码示例来源:origin: com.alibaba.otter/canal.client

public ClusterNodeAccessStrategy(String destination, ZkClientx zkClient){
  this.zkClient = zkClient;
  childListener = new IZkChildListener() {
    public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
      initClusters(currentChilds);
    }
  };
  dataListener = new IZkDataListener() {
    public void handleDataDeleted(String dataPath) throws Exception {
      runningAddress = null;
    }
    public void handleDataChange(String dataPath, Object data) throws Exception {
      initRunning(data);
    }
  };
  String clusterPath = ZookeeperPathUtils.getDestinationClusterRoot(destination);
  this.zkClient.subscribeChildChanges(clusterPath, childListener);
  initClusters(this.zkClient.getChildren(clusterPath));
  String runningPath = ZookeeperPathUtils.getDestinationServerRunning(destination);
  this.zkClient.subscribeDataChanges(runningPath, dataListener);
  initRunning(this.zkClient.readData(runningPath, true));
}

代码示例来源:origin: com.alibaba.otter/canal.client

String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
  zkClient.createPersistent(ZookeeperPathUtils.getClientIdNodePath(this.destination, clientData.getClientId()),
    true); // 尝试创建父节点
  initRunning();

代码示例来源:origin: alibaba/canal

public PositionRange getLastestBatch(ClientIdentity clientIdentity) {
  String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
    clientIdentity.getClientId());
  List<String> nodes = null;
  try {
    nodes = zkClientx.getChildren(path);
  } catch (ZkNoNodeException e) {
    // ignore
  }
  if (CollectionUtils.isEmpty(nodes)) {
    return null;
  }
  // 找到最大的Id
  ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
  for (String batchIdString : nodes) {
    batchIds.add(Long.valueOf(batchIdString));
  }
  Long maxBatchId = Collections.max(batchIds);
  PositionRange result = getBatch(clientIdentity, maxBatchId);
  if (result == null) { // 出现为null,说明zk节点有变化,重新获取
    return getLastestBatch(clientIdentity);
  } else {
    return result;
  }
}

代码示例来源:origin: alibaba/canal

public void start() throws Throwable {
  logger.info("## start the canal server[{}:{}]", ip, port);
  final String path = ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port);
  initCid(path);
  if (zkclientx != null) {

代码示例来源:origin: alibaba/canal

public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {
  String path = ZookeeperPathUtils.getCursorPath(clientIdentity.getDestination(), clientIdentity.getClientId());
  byte[] data = JsonUtils.marshalToByte(position, SerializerFeature.WriteClassName);
  try {
    zkClientx.writeData(path, data);
  } catch (ZkNoNodeException e) {
    zkClientx.createPersistent(path, data, true);// 第一次节点不存在,则尝试重建
  }
}

代码示例来源:origin: alibaba/canal

public PositionRange getBatch(ClientIdentity clientIdentity, Long batchId) throws CanalMetaManagerException {
  String path = ZookeeperPathUtils
    .getBatchMarkWithIdPath(clientIdentity.getDestination(), clientIdentity.getClientId(), batchId);
  byte[] data = zkClientx.readData(path, true);
  if (data == null) {
    return null;
  }
  PositionRange positionRange = JsonUtils.unmarshalFromByte(data, PositionRange.class);
  return positionRange;
}

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com