- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
本文整理了Java中org.apache.flink.yarn.YarnClusterDescriptor
类的一些代码示例,展示了YarnClusterDescriptor
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。YarnClusterDescriptor
类的具体详情如下:
包路径:org.apache.flink.yarn.YarnClusterDescriptor
类名称:YarnClusterDescriptor
[英]Implementation of AbstractYarnClusterDescriptor which is used to start the application master.
[中]AbstractYanClusterDescriptor的实现,用于启动应用程序主机。
代码示例来源:origin: apache/flink
@Test
public void testFailIfTaskSlotsHigherThanMaxVcores() throws ClusterDeploymentException {
final Configuration flinkConfiguration = new Configuration();
flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
flinkConfiguration,
yarnConfiguration,
temporaryFolder.getRoot().getAbsolutePath(),
yarnClient,
true);
clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(1)
.setTaskManagerMemoryMB(1)
.setNumberTaskManagers(1)
.setSlotsPerTaskManager(Integer.MAX_VALUE)
.createClusterSpecification();
try {
clusterDescriptor.deploySessionCluster(clusterSpecification);
fail("The deploy call should have failed.");
} catch (ClusterDeploymentException e) {
// we expect the cause to be an IllegalConfigurationException
if (!(e.getCause() instanceof IllegalConfigurationException)) {
throw e;
}
} finally {
clusterDescriptor.close();
}
}
代码示例来源:origin: apache/flink
@Nonnull
YarnClusterDescriptor createYarnClusterDescriptor(org.apache.flink.configuration.Configuration flinkConfiguration) {
final YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
flinkConfiguration,
YARN_CONFIGURATION,
CliFrontend.getConfigurationDirectoryFromEnv(),
yarnClient,
true);
yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.toURI()));
yarnClusterDescriptor.addShipFiles(Collections.singletonList(flinkLibFolder));
return yarnClusterDescriptor;
}
代码示例来源:origin: apache/flink
@Override
public ClusterClient<ApplicationId> deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) throws ClusterDeploymentException {
// this is required because the slots are allocated lazily
jobGraph.setAllowQueuedScheduling(true);
try {
return deployInternal(
clusterSpecification,
"Flink per-job cluster",
getYarnJobClusterEntrypoint(),
jobGraph,
detached);
} catch (Exception e) {
throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
}
}
代码示例来源:origin: apache/flink
try (YarnClusterDescriptor descriptor = new YarnClusterDescriptor(
new Configuration(),
yarnConfiguration,
yarnClient,
true)) {
descriptor.setLocalJarPath(new Path("/path/to/flink.jar"));
shipFiles.add(libFolder);
descriptor.addShipFiles(shipFiles);
descriptor.addLibFolderToShipFiles(effectiveShipFiles);
代码示例来源:origin: apache/flink
final YarnClient yarnClient = getYarnClient();
try (final YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
configuration,
getYarnConfiguration(),
true)) {
yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
clusterClient = yarnClusterDescriptor.deployJobCluster(
clusterSpecification,
jobGraph,
yarnClusterDescriptor.killCluster(applicationId);
代码示例来源:origin: apache/flink
@Test
public void testSetupApplicationMasterContainer() {
Configuration cfg = new Configuration();
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
cfg,
yarnConfiguration,
final String log4j =
"-Dlog4j.configuration=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME; // if set
final String mainClass = clusterDescriptor.getYarnSessionClusterEntrypoint();
final String args = "";
final String redirects =
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
.setupApplicationMasterContainer(
mainClass,
false,
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
.setupApplicationMasterContainer(
mainClass,
false,
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
.setupApplicationMasterContainer(
mainClass,
true,
代码示例来源:origin: apache/flink
private AbstractYarnClusterDescriptor getClusterDescriptor(
Configuration configuration,
YarnConfiguration yarnConfiguration,
String configurationDirectory) {
final YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfiguration);
yarnClient.start();
return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
configurationDirectory,
yarnClient,
false);
}
}
代码示例来源:origin: apache/flink
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
new Configuration(),
yarnConfiguration,
true);
yarnClusterDescriptor.close();
closableYarnClient.start();
yarnClusterDescriptor = new YarnClusterDescriptor(
new Configuration(),
yarnConfiguration,
false);
yarnClusterDescriptor.close();
代码示例来源:origin: apache/flink
try (YarnClusterDescriptor descriptor = new YarnClusterDescriptor(
new Configuration(),
yarnConfiguration,
CommonTestUtils.setEnv(env);
descriptor.addLibFolderToShipFiles(effectiveShipFiles);
} finally {
CommonTestUtils.setEnv(oldEnv);
代码示例来源:origin: apache/flink
/**
* Tests that Yarn will restart a killed {@link YarnSessionClusterEntrypoint} which will then resume
* a persisted {@link JobGraph}.
*/
@Test
public void testKillYarnSessionClusterEntrypoint() throws Exception {
assumeTrue(
"This test kills processes via the pkill command. Thus, it only runs on Linux, Mac OS, Free BSD and Solaris.",
OperatingSystem.isLinux() || OperatingSystem.isMac() || OperatingSystem.isFreeBSD() || OperatingSystem.isSolaris());
final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
try {
final JobID jobId = submitJob(restClusterClient);
final ApplicationId id = restClusterClient.getClusterId();
waitUntilJobIsRunning(restClusterClient, jobId);
killApplicationMaster(yarnClusterDescriptor.getYarnSessionClusterEntrypoint());
waitForApplicationAttempt(id, 2);
waitForJobTermination(restClusterClient, jobId);
killApplicationAndWait(id);
} finally {
restClusterClient.shutdown();
}
}
代码示例来源:origin: apache/flink
private RestClusterClient<ApplicationId> deploySessionCluster(YarnClusterDescriptor yarnClusterDescriptor) throws ClusterDeploymentException {
final int containerMemory = 256;
final ClusterClient<ApplicationId> yarnClusterClient = yarnClusterDescriptor.deploySessionCluster(
new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(containerMemory)
.setTaskManagerMemoryMB(containerMemory)
.setSlotsPerTaskManager(1)
.createClusterSpecification());
assertThat(yarnClusterClient, is(instanceOf(RestClusterClient.class)));
return (RestClusterClient<ApplicationId>) yarnClusterClient;
}
代码示例来源:origin: apache/flink
@Test
public void testJobRecoversAfterKillingTaskManager() throws Exception {
final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
try {
final JobID jobId = submitJob(restClusterClient);
waitUntilJobIsRunning(restClusterClient, jobId);
stopTaskManagerContainer();
waitUntilJobIsRestarted(restClusterClient, jobId, 1);
waitForJobTermination(restClusterClient, jobId);
killApplicationAndWait(restClusterClient.getClusterId());
} finally {
restClusterClient.shutdown();
}
}
代码示例来源:origin: apache/flink
final YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration,
yarnConfiguration,
true);
clusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
clusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
.createClusterSpecification();
final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true);
clusterDescriptor.killCluster(clusterId);
clusterDescriptor.close();
代码示例来源:origin: DTStack/flinkStreamSQL
private AbstractYarnClusterDescriptor getClusterDescriptor(
Configuration configuration,
YarnConfiguration yarnConfiguration,
String configurationDirectory) {
return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
configurationDirectory,
yarnClient,
false);
}
}
代码示例来源:origin: apache/flink
configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration,
yarnConfiguration,
true);
clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
clusterDescriptor.deploySessionCluster(clusterSpecification);
clusterDescriptor.close();
代码示例来源:origin: org.apache.flink/flink-yarn_2.11
private AbstractYarnClusterDescriptor getClusterDescriptor(
Configuration configuration,
YarnConfiguration yarnConfiguration,
String configurationDirectory) {
final YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfiguration);
yarnClient.start();
return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
configurationDirectory,
yarnClient,
false);
}
}
代码示例来源:origin: org.apache.flink/flink-yarn_2.11
@Override
public ClusterClient<ApplicationId> deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) throws ClusterDeploymentException {
// this is required because the slots are allocated lazily
jobGraph.setAllowQueuedScheduling(true);
try {
return deployInternal(
clusterSpecification,
"Flink per-job cluster",
getYarnJobClusterEntrypoint(),
jobGraph,
detached);
} catch (Exception e) {
throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
}
}
代码示例来源:origin: org.apache.flink/flink-yarn
private AbstractYarnClusterDescriptor getClusterDescriptor(
Configuration configuration,
YarnConfiguration yarnConfiguration,
String configurationDirectory) {
final YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfiguration);
yarnClient.start();
return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
configurationDirectory,
yarnClient,
false);
}
}
代码示例来源:origin: org.apache.flink/flink-yarn
@Override
public ClusterClient<ApplicationId> deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) throws ClusterDeploymentException {
// this is required because the slots are allocated lazily
jobGraph.setAllowQueuedScheduling(true);
try {
return deployInternal(
clusterSpecification,
"Flink per-job cluster",
getYarnJobClusterEntrypoint(),
jobGraph,
detached);
} catch (Exception e) {
throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
}
}
代码示例来源:origin: DTStack/flinkStreamSQL
AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(config, yarnConf, ".", yarnClient, false);
ClusterClient clusterClient = clusterDescriptor.retrieve(applicationId);
clusterClient.setDetached(true);
YARN 中的 yarn-site.xml 与 yarn-default.xml 有什么区别?看起来yarn-default.xml 在Hadoop 2.2 中已被弃用? 最佳答案 在所有 Hadoo
我们有一个在 yarn 上运行的流媒体应用程序,我们希望确保它 24/7 全天候运行。 有没有办法告诉 yarn 在失败时自动重启特定的应用程序? 最佳答案 你试过了吗Hadoop Yarn - Re
我在根队列下有 4 个队列,配置如下。 |-------------|-----------------|---------------------|-------------------| | Qu
我正在使用 YARN(和 Dask)版本 Hadoop 2.7.3-amzn-1 在 AWS EMR 上构建应用程序。我正在尝试测试各种故障场景,并且我想模拟容器故障。我似乎找不到一种简单的方法来杀死
我想创建一个 cron 来通过它的应用程序名称杀死一个 yarn 应用程序(Spark)。但我发现 yarn 应用程序 -kill 需要一个应用程序 ID。是否有解决方案可以通过应用程序名称杀死它,或
我正在尝试从此链接运行通用入门套件:https://github.com/ng-seed/universal即使我按照步骤安装了 Yarn,当我运行命令来运行服务器时,它给了我以下错误: 错误:找不到
我正在尝试 YARN 2.2 中的分布式 Shell 示例,希望有人能澄清托管和非托管应用程序管理器之间的区别是什么? 例如以下几行出现在客户端代码中 // unmanaged AM appConte
我有一个像这样的工作区项目: /project - package.json /packages /project-a package.json
这两个到底做什么用,在哪里使用它们? yarn 安装 yarn 构建 最佳答案 简而言之,yarn install 是用于安装项目所有依赖项的命令,通常在 package.json 文件中分配。在大多
所以,到目前为止,似乎没有 yarn audit --fix ,所以我想弄清楚如何修复我的 yarn audit错误。 我试过 yarn upgrade它修复了一些错误(这很好),但仍然存在一些错误。
我正在使用一个使用 yarn 的 dockerized pyspark 集群。为了提高数据处理管道的效率,我想增加分配给 pyspark 执行程序和驱动程序的内存量。 这是通过将以下两个键值对添加到
我尝试重新安装yarn,但重新安装后发现这个问题,我尝试搜索互联网但没有找到解决方案。 fiii@neo:~$ yarn --version node:internal/modules/cjs/loa
我正在试验2号纱和植面。 我创建了一个新文件夹:/projects/yarn2/根据他们的安装指南https://yarnpkg.com/getting-started我跑了 cd /projects
我是hadoop和YARN的新手。启动hdfs之后,我使用软件包中提供的start-yarn.sh启动YARN并出现错误。 我在Ubuntu 18.04上使用hadoop 3.2.0,jdk-11。
Apache Spark最近更新了版本至0.8.1,新增了yarn-client模式。我的问题是,yarn-client 模式的真正含义是什么?在文档中它说: With yarn-client mod
我们有一个在 HDFS 2.7.3 上运行的 Spark 流应用程序,使用 Yarn 作为资源管理器....在运行应用程序时......这两个文件夹 /tmp/hadoop/data/nm-local
我的机器上的 yarn 命令有问题。我的机器上安装了 hadoop 和 yarn 包管理器(Javascript)。当我运行 yarn init 时,它调用 hadoop 的 YARN 并响应: Er
我正在尝试运行此处列出的简单 yarn 应用程序:https://github.com/hortonworks/simple-yarn-app 我是 Java 和 Hadoop 的初学者,当我尝试使用
我正在尝试使用 YARN node labels标记工作节点,但是当我在 YARN(Spark 或简单的 YARN 应用程序)上运行应用程序时,这些应用程序无法启动。 使用 Spark,指定 --co
我一直只使用 npm 而从不显式使用 yarn/webpack。我需要从这个 repo 运行代码: https://github.com/looker-open-source/custom_visua
我是一名优秀的程序员,十分优秀!