gpt4 book ai didi

org.apache.samza.zk.ZkBarrierForVersionUpgrade类的使用及代码示例

转载 作者:知者 更新时间:2024-03-14 18:57:31 89 4
gpt4 key购买 nike

本文整理了Java中org.apache.samza.zk.ZkBarrierForVersionUpgrade类的一些代码示例,展示了ZkBarrierForVersionUpgrade类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkBarrierForVersionUpgrade类的具体详情如下:
包路径:org.apache.samza.zk.ZkBarrierForVersionUpgrade
类名称:ZkBarrierForVersionUpgrade

ZkBarrierForVersionUpgrade介绍

[英]ZkBarrierForVersionUpgrade is an implementation of distributed barrier, which guarantees that the expected barrier size and barrier participants match before marking the barrier as complete. It also allows the caller to expire the barrier. This implementation is specifically tailored towards barrier support during jobmodel version upgrades. The participant responsible for the version upgrade starts the barrier by invoking #create(String,List). Each participant in the list, then, joins the new barrier. When all listed participants #join(String,String)the barrier, the creator marks the barrier as org.apache.samza.zk.ZkBarrierForVersionUpgrade.State#DONEwhich signals the end of barrier. The creator of the barrier can expire the barrier by invoking #expire(String). This will mark the barrier with value org.apache.samza.zk.ZkBarrierForVersionUpgrade.State#TIMED_OUT and indicates to everyone that it is no longer valid. Describes the lifecycle of a barrier.

When expected participants join 
Leader    ---< NEW ---------------------------------------- < DONE 
|      barrier within barrierTimeOut. 
| 
| 
| 
| 
| 
|    When expected participants doesn't 
| ----------------------------------------- < TIMED_OUT 
join barrier within barrierTimeOut.

The caller can listen to events associated with the barrier by registering a ZkBarrierListener. Zk Tree Reference: /barrierRoot/ | |- barrier_{version1}/ | |- barrier_state/ | | ([NEW|DONE|TIMED_OUT]) | |- barrier_participants/ | | |- {id1} | | |- {id2} | | |- ...
[中]ZkBarrierForVersionUpgrade是分布式屏障的一种实现,它保证在标记屏障完成之前,预期的屏障大小和屏障参与者匹配。它还允许调用者终止障碍。此实现专门针对jobmodel版本升级期间的屏障支持而定制。负责版本升级的参与者通过调用#create(String,List)启动屏障。然后,列表中的每个参与者都加入了新的障碍。当所有列出的参与者#加入(String,String)障碍时,创建者将障碍标记为org。阿帕奇。萨姆萨。zk。ZkBarrierForVersionUpgrade。状态#完成,表示屏障结束。屏障的创建者可以通过调用#expire(字符串)使屏障过期。这将用价值组织标记障碍。阿帕奇。萨姆萨。zk。ZkBarrierForVersionUpgrade。状态#超时#并向所有人表明它不再有效。描述屏障的生命周期

When expected participants join 
Leader    ---< NEW ---------------------------------------- < DONE 
|      barrier within barrierTimeOut. 
| 
| 
| 
| 
| 
|    When expected participants doesn't 
| ----------------------------------------- < TIMED_OUT 
join barrier within barrierTimeOut.

调用者可以通过注册ZKBrierListener来监听与屏障相关的事件。Zk树引用:/barrieroot/| | |-barrier{version1}/| |-barrier | state/| |([NEW | DONE | TIMED |])|-barrier |参与者| | | |-{id1 | | |-{id2 | |-。。。

代码示例

代码示例来源:origin: apache/samza

@Override
 public int compare(String o1, String o2) {
  // barrier's name format is barrier_<num>
  return ZkBarrierForVersionUpgrade.getVersion(o1) - ZkBarrierForVersionUpgrade.getVersion(o2);
 }
});

代码示例来源:origin: apache/samza

ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener, debounceTimer);
ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener, debounceTimer);
processor1Barrier.create(BARRIER_VERSION, processors);
processor1Barrier.join(BARRIER_VERSION, "p1");
processor2Barrier.join(BARRIER_VERSION, "p2");
processor1Barrier.expire(BARRIER_VERSION);
processor1Barrier.join(BARRIER_VERSION, "p3");

代码示例来源:origin: apache/samza

@Test
public void testZkBarrierForVersionUpgrade() {
 String barrierId = String.format("%s/%s", zkUtils1.getKeyBuilder().getRootPath(), RandomStringUtils.randomAlphabetic(4));
 List<String> processors = ImmutableList.of("p1", "p2");
 CountDownLatch latch = new CountDownLatch(2);
 TestZkBarrierListener listener = new TestZkBarrierListener(latch, State.DONE);
 ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener, debounceTimer);
 ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener, debounceTimer);
 processor1Barrier.create(BARRIER_VERSION, processors);
 State barrierState = zkUtils.getZkClient().readData(barrierId + "/barrier_1/barrier_state");
 assertEquals(State.NEW, barrierState);
 processor1Barrier.join(BARRIER_VERSION, "p1");
 processor2Barrier.join(BARRIER_VERSION, "p2");
 boolean result = false;
 try {
  result = latch.await(10000, TimeUnit.MILLISECONDS);
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
 assertTrue("Barrier failed to complete within test timeout.", result);
 List<String> children = zkUtils.getZkClient().getChildren(barrierId + "/barrier_1/barrier_participants");
 barrierState = zkUtils.getZkClient().readData(barrierId + "/barrier_1/barrier_state");
 assertEquals(State.DONE, barrierState);
 assertNotNull(children);
 assertEquals("Unexpected barrier state. Didn't find two processors.", 2, children.size());
 assertEquals("Unexpected barrier state. Didn't find the expected members.", processors, children);
}

代码示例来源:origin: apache/samza

@Test
public void testCleanUpZkBarrierVersion() {
 String root = zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix();
 zkUtils.getZkClient().createPersistent(root, true);
 ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(root, zkUtils, null, null);
 for (int i = 200; i < 210; i++) {
  barrier.create(String.valueOf(i), new ArrayList<>(Arrays.asList(i + "a", i + "b", i + "c")));
 }
 zkUtils.deleteOldBarrierVersions(5);
 List<String> zNodeIds = zkUtils.getZkClient().getChildren(root);
 Collections.sort(zNodeIds);
 Assert.assertEquals(Arrays.asList("barrier_205", "barrier_206", "barrier_207", "barrier_208", "barrier_209"),
   zNodeIds);
}

代码示例来源:origin: org.apache.samza/samza-core

@Override
public void onBarrierCreated(String version) {
 // Start the timer for rebalancing
 startTime = System.nanoTime();
 metrics.barrierCreation.inc();
 if (leaderElector.amILeader()) {
  debounceTimer.scheduleAfterDebounceTime(barrierAction, (new ZkConfig(config)).getZkBarrierTimeoutMs(), () -> barrier.expire(version));
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.11

ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
 this.config = config;
 this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
 this.processorId = createProcessorId(config);
 this.zkUtils = zkUtils;
 // setup a listener for a session state change
 // we are mostly interested in "session closed" and "new session created" events
 zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
 leaderElector = new ZkLeaderElector(processorId, zkUtils);
 leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
 this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
 this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
 debounceTimer = new ScheduleAfterDebounceTime(processorId);
 debounceTimer.setScheduledTaskCallback(throwable -> {
   LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
   stop();
  });
 this.barrier =  new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
 systemAdmins = new SystemAdmins(config);
 streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

/**
 * Invoked when there is a change to the JobModelVersion z-node. It signifies that a new JobModel version is available.
 */
@Override
public void doHandleDataChange(String dataPath, Object data) {
 debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> {
   String jobModelVersion = (String) data;
   LOG.info("Got a notification for new JobModel version. Path = {} Version = {}", dataPath, data);
   newJobModel = zkUtils.getJobModel(jobModelVersion);
   LOG.info("pid=" + processorId + ": new JobModel is available. Version =" + jobModelVersion + "; JobModel = " + newJobModel);
   if (!newJobModel.getContainers().containsKey(processorId)) {
    LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}",
      processorId, newJobModel);
    stop();
   } else {
    // stop current work
    if (coordinatorListener != null) {
     coordinatorListener.onJobModelExpired();
    }
    // update ZK and wait for all the processors to get this new version
    barrier.join(jobModelVersion, processorId);
   }
  });
}

代码示例来源:origin: org.apache.samza/samza-core

barrier.create(nextJMVersion, currentProcessorIds);

代码示例来源:origin: apache/samza

@Test
public void testShouldDiscardBarrierUpdateEventsAfterABarrierIsMarkedAsDone() {
 String barrierId = String.format("%s/%s", zkUtils1.getKeyBuilder().getRootPath(), RandomStringUtils.randomAlphabetic(4));
 List<String> processors = ImmutableList.of("p1", "p2");
 CountDownLatch latch = new CountDownLatch(2);
 TestZkBarrierListener listener = new TestZkBarrierListener(latch, State.DONE);
 ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener, debounceTimer);
 ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener, debounceTimer);
 processor1Barrier.create(BARRIER_VERSION, processors);
 processor1Barrier.join(BARRIER_VERSION, "p1");
 processor2Barrier.join(BARRIER_VERSION, "p2");
 boolean result = false;
 try {
  result = latch.await(10000, TimeUnit.MILLISECONDS);
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
 assertTrue("Barrier Timeout test failed to complete within test timeout.", result);
 processor1Barrier.expire(BARRIER_VERSION);
 State barrierState = zkUtils.getZkClient().readData(barrierId + "/barrier_1/barrier_state");
 assertEquals(State.DONE, barrierState);
}

代码示例来源:origin: apache/samza

@Override
public void onBarrierCreated(String version) {
 // Start the timer for rebalancing
 startTime = System.nanoTime();
 metrics.barrierCreation.inc();
 if (leaderElector.amILeader()) {
  debounceTimer.scheduleAfterDebounceTime(barrierAction, (new ZkConfig(config)).getZkBarrierTimeoutMs(), () -> barrier.expire(version));
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
 this.config = config;
 this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
 this.processorId = createProcessorId(config);
 this.zkUtils = zkUtils;
 // setup a listener for a session state change
 // we are mostly interested in "session closed" and "new session created" events
 zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
 leaderElector = new ZkLeaderElector(processorId, zkUtils);
 leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
 this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
 this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
 debounceTimer = new ScheduleAfterDebounceTime(processorId);
 debounceTimer.setScheduledTaskCallback(throwable -> {
   LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
   stop();
  });
 this.barrier =  new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
 systemAdmins = new SystemAdmins(config);
 streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
}

代码示例来源:origin: apache/samza

/**
 * Invoked when there is a change to the JobModelVersion z-node. It signifies that a new JobModel version is available.
 */
@Override
public void doHandleDataChange(String dataPath, Object data) {
 debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> {
   String jobModelVersion = (String) data;
   LOG.info("Got a notification for new JobModel version. Path = {} Version = {}", dataPath, data);
   newJobModel = zkUtils.getJobModel(jobModelVersion);
   LOG.info("pid=" + processorId + ": new JobModel is available. Version =" + jobModelVersion + "; JobModel = " + newJobModel);
   if (!newJobModel.getContainers().containsKey(processorId)) {
    LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}",
      processorId, newJobModel);
    stop();
   } else {
    // stop current work
    if (coordinatorListener != null) {
     coordinatorListener.onJobModelExpired();
    }
    // update ZK and wait for all the processors to get this new version
    barrier.join(jobModelVersion, processorId);
   }
  });
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

barrier.create(nextJMVersion, currentProcessorIds);

代码示例来源:origin: apache/samza

@Test
public void testZkBarrierForVersionUpgradeWithTimeOut() {
 String barrierId = String.format("%s/%s", zkUtils1.getKeyBuilder().getRootPath(), RandomStringUtils.randomAlphabetic(4));
 List<String> processors = ImmutableList.of("p1", "p2", "p3");
 CountDownLatch latch = new CountDownLatch(2);
 TestZkBarrierListener listener = new TestZkBarrierListener(latch, State.TIMED_OUT);
 ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener, debounceTimer);
 ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener, debounceTimer);
 processor1Barrier.create(BARRIER_VERSION, processors);
 processor1Barrier.join(BARRIER_VERSION, "p1");
 processor2Barrier.join(BARRIER_VERSION, "p2");
 processor1Barrier.expire(BARRIER_VERSION);
 boolean result = false;
 try {
  result = latch.await(10000, TimeUnit.MILLISECONDS);
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
 assertTrue("Barrier Timeout test failed to complete within test timeout.", result);
 List<String> children = zkUtils.getZkClient().getChildren(barrierId + "/barrier_1/barrier_participants");
 State barrierState = zkUtils.getZkClient().readData(barrierId + "/barrier_1/barrier_state");
 assertEquals(State.TIMED_OUT, barrierState);
 assertNotNull(children);
 assertEquals("Unexpected barrier state. Didn't find two processors.", 2, children.size());
 assertEquals("Unexpected barrier state. Didn't find the expected members.", ImmutableList.of("p1", "p2"), children);
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

@Override
public void onBarrierCreated(String version) {
 // Start the timer for rebalancing
 startTime = System.nanoTime();
 metrics.barrierCreation.inc();
 if (leaderElector.amILeader()) {
  debounceTimer.scheduleAfterDebounceTime(barrierAction, (new ZkConfig(config)).getZkBarrierTimeoutMs(), () -> barrier.expire(version));
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
 this.config = config;
 this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
 this.processorId = createProcessorId(config);
 this.zkUtils = zkUtils;
 // setup a listener for a session state change
 // we are mostly interested in "session closed" and "new session created" events
 zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
 leaderElector = new ZkLeaderElector(processorId, zkUtils);
 leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
 this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
 this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
 debounceTimer = new ScheduleAfterDebounceTime(processorId);
 debounceTimer.setScheduledTaskCallback(throwable -> {
   LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
   stop();
  });
 this.barrier =  new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
 systemAdmins = new SystemAdmins(config);
 streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
}

代码示例来源:origin: org.apache.samza/samza-core_2.11

/**
 * Invoked when there is a change to the JobModelVersion z-node. It signifies that a new JobModel version is available.
 */
@Override
public void doHandleDataChange(String dataPath, Object data) {
 debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> {
   String jobModelVersion = (String) data;
   LOG.info("Got a notification for new JobModel version. Path = {} Version = {}", dataPath, data);
   newJobModel = zkUtils.getJobModel(jobModelVersion);
   LOG.info("pid=" + processorId + ": new JobModel is available. Version =" + jobModelVersion + "; JobModel = " + newJobModel);
   if (!newJobModel.getContainers().containsKey(processorId)) {
    LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}",
      processorId, newJobModel);
    stop();
   } else {
    // stop current work
    if (coordinatorListener != null) {
     coordinatorListener.onJobModelExpired();
    }
    // update ZK and wait for all the processors to get this new version
    barrier.join(jobModelVersion, processorId);
   }
  });
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

barrier.create(nextJMVersion, currentProcessorIds);

代码示例来源:origin: org.apache.samza/samza-core_2.11

@Override
 public int compare(String o1, String o2) {
  // barrier's name format is barrier_<num>
  return ZkBarrierForVersionUpgrade.getVersion(o1) - ZkBarrierForVersionUpgrade.getVersion(o2);
 }
});

代码示例来源:origin: org.apache.samza/samza-core_2.10

@Override
public void onBarrierCreated(String version) {
 // Start the timer for rebalancing
 startTime = System.nanoTime();
 metrics.barrierCreation.inc();
 if (leaderElector.amILeader()) {
  debounceTimer.scheduleAfterDebounceTime(barrierAction, (new ZkConfig(config)).getZkBarrierTimeoutMs(), () -> barrier.expire(version));
 }
}

89 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com