- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
本文整理了Java中org.apache.samza.job.yarn.YarnClusterResourceManager
类的一些代码示例,展示了YarnClusterResourceManager
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。YarnClusterResourceManager
类的具体详情如下:
包路径:org.apache.samza.job.yarn.YarnClusterResourceManager
类名称:YarnClusterResourceManager
[英]An YarnClusterResourceManager implements a ClusterResourceManager using Yarn as the underlying resource manager. This class is as an adaptor between Yarn and translates Yarn callbacks into Samza specific callback methods as specified in Callback. Thread-safety: 1.Start and stop methods should NOT be called from multiple threads. 2.ALL callbacks from the YarnContainerManager are invoked from a single Callback thread of the AMRMClient. 3.Stop should not be called more than once.
[中]YarnClusterResourceManager使用纱线作为底层资源管理器来实现ClusterResourceManager。这个类作为Thread之间的适配器,将Thread回调转换为回调中指定的Samza特定回调方法。线程安全:1。启动和停止方法不应从多个线程中调用。2.YarnContainerManager的所有回调都是从AMRMClient的单个回调线程调用的。3.停止的次数不应超过一次。
代码示例来源:origin: org.apache.samza/samza-yarn
@Override
public ClusterResourceManager getClusterResourceManager(ClusterResourceManager.Callback callback, SamzaApplicationState state) {
log.info("Creating an instance of a cluster resource manager for Yarn. ");
JobModelManager jobModelManager = state.jobModelManager;
Config config = jobModelManager.jobModel().getConfig();
YarnClusterResourceManager manager = new YarnClusterResourceManager(config, jobModelManager, callback, state);
return manager;
}
}
代码示例来源:origin: org.apache.samza/samza-yarn
/**
*
* Requests the launch of a StreamProcessor with the specified ID on the resource
* @param resource , the SamzaResource on which to launch the StreamProcessor
* @param builder, the builder to build the resource launch command from
*
* TODO: Support non-builder methods to launch resources. Maybe, refactor into a ContainerLaunchStrategy interface
*/
@Override
public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) {
String containerIDStr = builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID());
log.info("Received launch request for {} on hostname {}", containerIDStr, resource.getHost());
synchronized (lock) {
try {
Container container = allocatedResources.get(resource);
if (container == null) {
log.info("Resource {} already released. ", resource);
return;
}
runContainer(containerIDStr, container, builder);
} catch (Throwable t) {
log.error("Error in launching stream processor:", t);
clusterManagerCallback.onStreamProcessorLaunchFailure(resource, t);
}
}
}
代码示例来源:origin: org.apache.samza/samza-yarn_2.11
log.info("Container ID {} using command {}", samzaContainerId, command);
Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape(container.getId().toString()));
printContainerEnvironmentVariables(samzaContainerId, env);
state.pendingYarnContainers.put(samzaContainerId, new YarnContainer(container));
startContainer(
packagePath,
container,
env,
getFormattedCommand(
ApplicationConstants.LOG_DIR_EXPANSION_VAR,
jobLib,
代码示例来源:origin: org.apache.samza/samza-yarn_2.11
@Override
public void onStartContainerError(ContainerId yarnContainerId, Throwable t) {
log.error(String.format("Yarn Container: %s could not start.", yarnContainerId), t);
String samzaContainerId = getPendingSamzaContainerId(yarnContainerId);
if (samzaContainerId != null) {
YarnContainer container = state.pendingYarnContainers.remove(samzaContainerId);
log.info("Failed Yarn Container: {} had Samza ContainerId: {} ", yarnContainerId, samzaContainerId);
SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
container.resource().getMemory(), container.nodeId().getHost(), yarnContainerId.toString());
log.info("Invoking failure callback for container: {}", yarnContainerId);
clusterManagerCallback.onStreamProcessorLaunchFailure(resource, new SamzaContainerLaunchException(t));
} else {
log.info("Got an invalid notification for container: {}", yarnContainerId);
}
}
代码示例来源:origin: org.apache.samza/samza-yarn_2.11
/**
* Stops the YarnContainerManager and all its sub-components.
* Stop should NOT be called from multiple threads.
* TODO: fix this to make stop idempotent?.
*/
@Override
public void stop(SamzaApplicationState.SamzaAppStatus status) {
log.info("Stopping AM client " );
lifecycle.onShutdown(status);
amClient.stop();
log.info("Stopping the AM service " );
nmClientAsync.stop();
log.info("Stopping the NM service " );
service.onShutdown();
metrics.stop();
if(status != SamzaApplicationState.SamzaAppStatus.UNDEFINED) {
cleanupStagingDir();
}
}
代码示例来源:origin: org.apache.samza/samza-yarn_2.11
/**
* Callback invoked from Yarn when containers complete. This translates the yarn callbacks into Samza specific
* ones.
*
* @param statuses the YarnContainerStatus callbacks from Yarn.
*/
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
List<SamzaResourceStatus> samzaResrcStatuses = new ArrayList<>();
for(ContainerStatus status: statuses) {
log.info("Container completed from RM " + status);
SamzaResourceStatus samzaResrcStatus = new SamzaResourceStatus(status.getContainerId().toString(), status.getDiagnostics(), status.getExitStatus());
samzaResrcStatuses.add(samzaResrcStatus);
String completedContainerID = getIDForContainer(status.getContainerId().toString());
log.info("Completed container had ID: {}", completedContainerID);
//remove the container from the list of running containers, if failed with a non-zero exit code, add it to the list of
//failed containers.
if(!completedContainerID.equals(INVALID_YARN_CONTAINER_ID)){
if(state.runningYarnContainers.containsKey(completedContainerID)) {
log.info("Removing container ID {} from completed containers", completedContainerID);
state.runningYarnContainers.remove(completedContainerID);
if(status.getExitStatus() != ContainerExitStatus.SUCCESS)
state.failedContainersStatus.put(status.getContainerId().toString(), status);
}
}
}
clusterManagerCallback.onResourcesCompleted(samzaResrcStatuses);
}
代码示例来源:origin: org.apache.samza/samza-yarn
log.info("Container ID {} using command {}", samzaContainerId, command);
Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape(container.getId().toString()));
printContainerEnvironmentVariables(samzaContainerId, env);
state.pendingYarnContainers.put(samzaContainerId, new YarnContainer(container));
startContainer(
packagePath,
container,
env,
getFormattedCommand(
ApplicationConstants.LOG_DIR_EXPANSION_VAR,
jobLib,
代码示例来源:origin: org.apache.samza/samza-yarn
@Override
public void onStartContainerError(ContainerId yarnContainerId, Throwable t) {
log.error(String.format("Yarn Container: %s could not start.", yarnContainerId), t);
String samzaContainerId = getPendingSamzaContainerId(yarnContainerId);
if (samzaContainerId != null) {
YarnContainer container = state.pendingYarnContainers.remove(samzaContainerId);
log.info("Failed Yarn Container: {} had Samza ContainerId: {} ", yarnContainerId, samzaContainerId);
SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
container.resource().getMemory(), container.nodeId().getHost(), yarnContainerId.toString());
log.info("Invoking failure callback for container: {}", yarnContainerId);
clusterManagerCallback.onStreamProcessorLaunchFailure(resource, new SamzaContainerLaunchException(t));
} else {
log.info("Got an invalid notification for container: {}", yarnContainerId);
}
}
代码示例来源:origin: org.apache.samza/samza-yarn
/**
* Stops the YarnContainerManager and all its sub-components.
* Stop should NOT be called from multiple threads.
* TODO: fix this to make stop idempotent?.
*/
@Override
public void stop(SamzaApplicationState.SamzaAppStatus status) {
log.info("Stopping AM client " );
lifecycle.onShutdown(status);
amClient.stop();
log.info("Stopping the AM service " );
nmClientAsync.stop();
log.info("Stopping the NM service " );
service.onShutdown();
metrics.stop();
if(status != SamzaApplicationState.SamzaAppStatus.UNDEFINED) {
cleanupStagingDir();
}
}
代码示例来源:origin: org.apache.samza/samza-yarn
/**
* Callback invoked from Yarn when containers complete. This translates the yarn callbacks into Samza specific
* ones.
*
* @param statuses the YarnContainerStatus callbacks from Yarn.
*/
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
List<SamzaResourceStatus> samzaResrcStatuses = new ArrayList<>();
for(ContainerStatus status: statuses) {
log.info("Container completed from RM " + status);
SamzaResourceStatus samzaResrcStatus = new SamzaResourceStatus(status.getContainerId().toString(), status.getDiagnostics(), status.getExitStatus());
samzaResrcStatuses.add(samzaResrcStatus);
String completedContainerID = getIDForContainer(status.getContainerId().toString());
log.info("Completed container had ID: {}", completedContainerID);
//remove the container from the list of running containers, if failed with a non-zero exit code, add it to the list of
//failed containers.
if(!completedContainerID.equals(INVALID_YARN_CONTAINER_ID)){
if(state.runningYarnContainers.containsKey(completedContainerID)) {
log.info("Removing container ID {} from completed containers", completedContainerID);
state.runningYarnContainers.remove(completedContainerID);
if(status.getExitStatus() != ContainerExitStatus.SUCCESS)
state.failedContainersStatus.put(status.getContainerId().toString(), status);
}
}
}
clusterManagerCallback.onResourcesCompleted(samzaResrcStatuses);
}
代码示例来源:origin: org.apache.samza/samza-yarn
@Override
public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
log.info("Received a containerStart notification from the NodeManager for container: {} ", containerId);
String samzaContainerId = getPendingSamzaContainerId(containerId);
if (samzaContainerId != null) {
// 1. Move the container from pending to running state
final YarnContainer container = state.pendingYarnContainers.remove(samzaContainerId);
log.info("Samza containerId:{} has started", samzaContainerId);
state.runningYarnContainers.put(samzaContainerId, container);
// 2. Invoke the success callback.
SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
clusterManagerCallback.onStreamProcessorLaunchSuccess(resource);
} else {
log.info("Got an invalid notification from YARN for container: {}", containerId);
}
}
代码示例来源:origin: org.apache.samza/samza-yarn_2.11
@Override
public ClusterResourceManager getClusterResourceManager(ClusterResourceManager.Callback callback, SamzaApplicationState state) {
log.info("Creating an instance of a cluster resource manager for Yarn. ");
JobModelManager jobModelManager = state.jobModelManager;
Config config = jobModelManager.jobModel().getConfig();
YarnClusterResourceManager manager = new YarnClusterResourceManager(config, jobModelManager, callback, state);
return manager;
}
}
代码示例来源:origin: org.apache.samza/samza-yarn_2.11
/**
*
* Requests the launch of a StreamProcessor with the specified ID on the resource
* @param resource , the SamzaResource on which to launch the StreamProcessor
* @param builder, the builder to build the resource launch command from
*
* TODO: Support non-builder methods to launch resources. Maybe, refactor into a ContainerLaunchStrategy interface
*/
@Override
public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) {
String containerIDStr = builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID());
log.info("Received launch request for {} on hostname {}", containerIDStr, resource.getHost());
synchronized (lock) {
try {
Container container = allocatedResources.get(resource);
if (container == null) {
log.info("Resource {} already released. ", resource);
return;
}
runContainer(containerIDStr, container, builder);
} catch (Throwable t) {
log.error("Error in launching stream processor:", t);
clusterManagerCallback.onStreamProcessorLaunchFailure(resource, t);
}
}
}
代码示例来源:origin: org.apache.samza/samza-yarn_2.11
@Override
public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
log.info("Received a containerStart notification from the NodeManager for container: {} ", containerId);
String samzaContainerId = getPendingSamzaContainerId(containerId);
if (samzaContainerId != null) {
// 1. Move the container from pending to running state
final YarnContainer container = state.pendingYarnContainers.remove(samzaContainerId);
log.info("Samza containerId:{} has started", samzaContainerId);
state.runningYarnContainers.put(samzaContainerId, container);
// 2. Invoke the success callback.
SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
clusterManagerCallback.onStreamProcessorLaunchSuccess(resource);
} else {
log.info("Got an invalid notification from YARN for container: {}", containerId);
}
}
本文整理了Java中org.apache.samza.job.yarn.YarnClusterResourceManager.printContainerEnvironmentVariables()方
本文整理了Java中org.apache.samza.job.yarn.YarnClusterResourceManager.getPendingSamzaContainerId()方法的一些代码示例
本文整理了Java中org.apache.samza.job.yarn.YarnClusterResourceManager.getEscapedEnvironmentVariablesMap()方法
本文整理了Java中org.apache.samza.job.yarn.YarnClusterResourceManager.cleanupStagingDir()方法的一些代码示例,展示了YarnC
本文整理了Java中org.apache.samza.job.yarn.YarnClusterResourceManager.runContainer()方法的一些代码示例,展示了YarnCluste
本文整理了Java中org.apache.samza.job.yarn.YarnClusterResourceManager.getIDForContainer()方法的一些代码示例,展示了YarnC
本文整理了Java中org.apache.samza.job.yarn.YarnClusterResourceManager.()方法的一些代码示例,展示了YarnClusterResourceMan
本文整理了Java中org.apache.samza.job.yarn.YarnClusterResourceManager.startContainer()方法的一些代码示例,展示了YarnClus
本文整理了Java中org.apache.samza.job.yarn.YarnClusterResourceManager.getFormattedCommand()方法的一些代码示例,展示了Yar
我是一名优秀的程序员,十分优秀!