gpt4 book ai didi

org.apache.flink.yarn.highavailability.YarnHighAvailabilityServices类的使用及代码示例

转载 作者:知者 更新时间:2024-03-16 14:07:31 28 4
gpt4 key购买 nike

本文整理了Java中org.apache.flink.yarn.highavailability.YarnHighAvailabilityServices类的一些代码示例,展示了YarnHighAvailabilityServices类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。YarnHighAvailabilityServices类的具体详情如下:
包路径:org.apache.flink.yarn.highavailability.YarnHighAvailabilityServices
类名称:YarnHighAvailabilityServices

YarnHighAvailabilityServices介绍

[英]The basis of HighAvailabilityServices for YARN setups. These high-availability services auto-configure YARN's HDFS and the YARN application's working directory to be used to store job recovery data.

Note for implementers: This class locks access to and creation of services, to make sure all services are properly shut down when shutting down this class. To participate in the checks, overriding methods should frame method body with calls to enter() and exit() as shown in the following pattern:

public LeaderRetrievalService getResourceManagerLeaderRetriever() finally  
exit(); 
} 
} 
}

[中]

代码示例

代码示例来源:origin: apache/flink

@Test
public void testRepeatedClose() throws Exception {
  final Configuration flinkConfig = new Configuration();
  final YarnHighAvailabilityServices services = new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig);
  services.closeAndCleanupAllData();
  // this should not throw an exception
  services.close();
}

代码示例来源:origin: apache/flink

@Override
public BlobStore createBlobStore() throws IOException {
  enter();
  try {
    return blobStoreService;
  } finally {
    exit();
  }
}

代码示例来源:origin: apache/flink

/**
 * To be called at the beginning of every method that creates an HA service. Acquires the lock
 * and check whether this HighAvailabilityServices instance is shut down.
 */
void enter() {
  if (!enterUnlessClosed()) {
    throw new IllegalStateException("closed");
  }
}

代码示例来源:origin: apache/flink

services.getSubmittedJobGraphStore();
  fail();
} catch (UnsupportedOperationException ignored) {}
services.close();
  services.createBlobStore();
  fail();
} catch (IllegalStateException ignored) {}
  services.getCheckpointRecoveryFactory();
  fail();
} catch (IllegalStateException ignored) {}
  services.getJobManagerLeaderElectionService(new JobID());
  fail();
} catch (IllegalStateException ignored) {}
  services.getJobManagerLeaderRetriever(new JobID());
  fail();
} catch (IllegalStateException ignored) {}
  services.getRunningJobsRegistry();
  fail();
} catch (IllegalStateException ignored) {}
  services.getResourceManagerLeaderElectionService();
  fail();
} catch (IllegalStateException ignored) {}

代码示例来源:origin: apache/flink

@Test
public void testClosingReportsToLeader() throws Exception {
  final Configuration flinkConfig = new Configuration();
  try (YarnHighAvailabilityServices services = new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig)) {
    final LeaderElectionService elector = services.getResourceManagerLeaderElectionService();
    final LeaderRetrievalService retrieval = services.getResourceManagerLeaderRetriever();
    final LeaderContender contender = mockContender(elector);
    final LeaderRetrievalListener listener = mock(LeaderRetrievalListener.class);
    elector.start(contender);
    retrieval.start(listener);
    // wait until the contender has become the leader
    verify(listener, timeout(1000L).times(1)).notifyLeaderAddress(anyString(), any(UUID.class));
    // now we can close the election service
    services.close();
    verify(contender, timeout(1000L).times(1)).handleError(any(Exception.class));
  }
}

代码示例来源:origin: apache/flink

close();

代码示例来源:origin: apache/flink

@Test
public void testCloseAndCleanup() throws Exception {
  final Configuration flinkConfig = new Configuration();
  flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost");
  flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427);
  // create the services
  YarnHighAvailabilityServices services = new YarnPreConfiguredMasterNonHaServices(
    flinkConfig,
    hadoopConfig,
    HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
  services.closeAndCleanupAllData();
  final FileSystem fileSystem = hdfsRootPath.getFileSystem();
  final Path workDir = new Path(hdfsCluster.getFileSystem().getWorkingDirectory().toString());
  try {
    fileSystem.getFileStatus(new Path(workDir, YarnHighAvailabilityServices.FLINK_RECOVERY_DATA_DIR));
    fail("Flink recovery data directory still exists");
  }
  catch (FileNotFoundException e) {
    // expected, because the directory should have been cleaned up
  }
  assertTrue(services.isClosed());
  // doing another cleanup when the services are closed should fail
  try {
    services.closeAndCleanupAllData();
    fail("should fail with an IllegalStateException");
  } catch (IllegalStateException e) {
    // expected
  }
}

代码示例来源:origin: org.apache.flink/flink-yarn_2.11

close();

代码示例来源:origin: org.apache.flink/flink-yarn_2.11

@Override
public BlobStore createBlobStore() throws IOException {
  enter();
  try {
    return blobStoreService;
  } finally {
    exit();
  }
}

代码示例来源:origin: org.apache.flink/flink-yarn

close();

代码示例来源:origin: org.apache.flink/flink-yarn_2.11

/**
 * To be called at the beginning of every method that creates an HA service. Acquires the lock
 * and check whether this HighAvailabilityServices instance is shut down.
 */
void enter() {
  if (!enterUnlessClosed()) {
    throw new IllegalStateException("closed");
  }
}

代码示例来源:origin: org.apache.flink/flink-yarn

@Override
public BlobStore createBlobStore() throws IOException {
  enter();
  try {
    return blobStoreService;
  } finally {
    exit();
  }
}

代码示例来源:origin: org.apache.flink/flink-yarn

/**
 * To be called at the beginning of every method that creates an HA service. Acquires the lock
 * and check whether this HighAvailabilityServices instance is shut down.
 */
void enter() {
  if (!enterUnlessClosed()) {
    throw new IllegalStateException("closed");
  }
}

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com