gpt4 book ai didi

org.apache.samza.job.yarn.YarnClusterResourceManager类的使用及代码示例

转载 作者:知者 更新时间:2024-03-19 00:37:31 25 4
gpt4 key购买 nike

本文整理了Java中org.apache.samza.job.yarn.YarnClusterResourceManager类的一些代码示例,展示了YarnClusterResourceManager类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。YarnClusterResourceManager类的具体详情如下:
包路径:org.apache.samza.job.yarn.YarnClusterResourceManager
类名称:YarnClusterResourceManager

YarnClusterResourceManager介绍

[英]An YarnClusterResourceManager implements a ClusterResourceManager using Yarn as the underlying resource manager. This class is as an adaptor between Yarn and translates Yarn callbacks into Samza specific callback methods as specified in Callback. Thread-safety: 1.Start and stop methods should NOT be called from multiple threads. 2.ALL callbacks from the YarnContainerManager are invoked from a single Callback thread of the AMRMClient. 3.Stop should not be called more than once.
[中]YarnClusterResourceManager使用纱线作为底层资源管理器来实现ClusterResourceManager。这个类作为Thread之间的适配器,将Thread回调转换为回调中指定的Samza特定回调方法。线程安全:1。启动和停止方法不应从多个线程中调用。2.YarnContainerManager的所有回调都是从AMRMClient的单个回调线程调用的。3.停止的次数不应超过一次。

代码示例

代码示例来源:origin: org.apache.samza/samza-yarn

@Override
 public ClusterResourceManager getClusterResourceManager(ClusterResourceManager.Callback callback, SamzaApplicationState state) {
  log.info("Creating an instance of a cluster resource manager for Yarn. ");
  JobModelManager jobModelManager = state.jobModelManager;
  Config config = jobModelManager.jobModel().getConfig();
  YarnClusterResourceManager manager = new YarnClusterResourceManager(config, jobModelManager, callback, state);
  return manager;
 }
}

代码示例来源:origin: org.apache.samza/samza-yarn

/**
 *
 * Requests the launch of a StreamProcessor with the specified ID on the resource
 * @param resource , the SamzaResource on which to launch the StreamProcessor
 * @param builder, the builder to build the resource launch command from
 *
 * TODO: Support non-builder methods to launch resources. Maybe, refactor into a ContainerLaunchStrategy interface
 */
@Override
public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) {
 String containerIDStr = builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID());
 log.info("Received launch request for {} on hostname {}", containerIDStr, resource.getHost());
 synchronized (lock) {
  try {
   Container container = allocatedResources.get(resource);
   if (container == null) {
    log.info("Resource {} already released. ", resource);
    return;
   }
   runContainer(containerIDStr, container, builder);
  } catch (Throwable t) {
   log.error("Error in launching stream processor:", t);
   clusterManagerCallback.onStreamProcessorLaunchFailure(resource, t);
  }
 }
}

代码示例来源:origin: org.apache.samza/samza-yarn_2.11

log.info("Container ID {} using command {}", samzaContainerId, command);
Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape(container.getId().toString()));
printContainerEnvironmentVariables(samzaContainerId, env);
state.pendingYarnContainers.put(samzaContainerId, new YarnContainer(container));
startContainer(
  packagePath,
  container,
  env,
  getFormattedCommand(
    ApplicationConstants.LOG_DIR_EXPANSION_VAR,
    jobLib,

代码示例来源:origin: org.apache.samza/samza-yarn_2.11

@Override
public void onStartContainerError(ContainerId yarnContainerId, Throwable t) {
 log.error(String.format("Yarn Container: %s could not start.", yarnContainerId), t);
 String samzaContainerId = getPendingSamzaContainerId(yarnContainerId);
 if (samzaContainerId != null) {
  YarnContainer container = state.pendingYarnContainers.remove(samzaContainerId);
  log.info("Failed Yarn Container: {} had Samza ContainerId: {} ", yarnContainerId, samzaContainerId);
  SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
    container.resource().getMemory(), container.nodeId().getHost(), yarnContainerId.toString());
  log.info("Invoking failure callback for container: {}", yarnContainerId);
  clusterManagerCallback.onStreamProcessorLaunchFailure(resource, new SamzaContainerLaunchException(t));
 } else {
  log.info("Got an invalid notification for container: {}", yarnContainerId);
 }
}

代码示例来源:origin: org.apache.samza/samza-yarn_2.11

/**
 * Stops the YarnContainerManager and all its sub-components.
 * Stop should NOT be called from multiple threads.
 * TODO: fix this to make stop idempotent?.
 */
@Override
public void stop(SamzaApplicationState.SamzaAppStatus status) {
 log.info("Stopping AM client " );
 lifecycle.onShutdown(status);
 amClient.stop();
 log.info("Stopping the AM service " );
 nmClientAsync.stop();
 log.info("Stopping the NM service " );
 service.onShutdown();
 metrics.stop();
 if(status != SamzaApplicationState.SamzaAppStatus.UNDEFINED) {
  cleanupStagingDir();
 }
}

代码示例来源:origin: org.apache.samza/samza-yarn_2.11

/**
 * Callback invoked from Yarn when containers complete. This translates the yarn callbacks into Samza specific
 * ones.
 *
 * @param statuses the YarnContainerStatus callbacks from Yarn.
 */
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
 List<SamzaResourceStatus> samzaResrcStatuses = new ArrayList<>();
 for(ContainerStatus status: statuses) {
  log.info("Container completed from RM " + status);
  SamzaResourceStatus samzaResrcStatus = new SamzaResourceStatus(status.getContainerId().toString(), status.getDiagnostics(), status.getExitStatus());
  samzaResrcStatuses.add(samzaResrcStatus);
  String completedContainerID = getIDForContainer(status.getContainerId().toString());
  log.info("Completed container had ID: {}", completedContainerID);
  //remove the container from the list of running containers, if failed with a non-zero exit code, add it to the list of
  //failed containers.
  if(!completedContainerID.equals(INVALID_YARN_CONTAINER_ID)){
   if(state.runningYarnContainers.containsKey(completedContainerID)) {
    log.info("Removing container ID {} from completed containers", completedContainerID);
    state.runningYarnContainers.remove(completedContainerID);
    if(status.getExitStatus() != ContainerExitStatus.SUCCESS)
     state.failedContainersStatus.put(status.getContainerId().toString(), status);
   }
  }
 }
 clusterManagerCallback.onResourcesCompleted(samzaResrcStatuses);
}

代码示例来源:origin: org.apache.samza/samza-yarn

log.info("Container ID {} using command {}", samzaContainerId, command);
Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape(container.getId().toString()));
printContainerEnvironmentVariables(samzaContainerId, env);
state.pendingYarnContainers.put(samzaContainerId, new YarnContainer(container));
startContainer(
  packagePath,
  container,
  env,
  getFormattedCommand(
    ApplicationConstants.LOG_DIR_EXPANSION_VAR,
    jobLib,

代码示例来源:origin: org.apache.samza/samza-yarn

@Override
public void onStartContainerError(ContainerId yarnContainerId, Throwable t) {
 log.error(String.format("Yarn Container: %s could not start.", yarnContainerId), t);
 String samzaContainerId = getPendingSamzaContainerId(yarnContainerId);
 if (samzaContainerId != null) {
  YarnContainer container = state.pendingYarnContainers.remove(samzaContainerId);
  log.info("Failed Yarn Container: {} had Samza ContainerId: {} ", yarnContainerId, samzaContainerId);
  SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
    container.resource().getMemory(), container.nodeId().getHost(), yarnContainerId.toString());
  log.info("Invoking failure callback for container: {}", yarnContainerId);
  clusterManagerCallback.onStreamProcessorLaunchFailure(resource, new SamzaContainerLaunchException(t));
 } else {
  log.info("Got an invalid notification for container: {}", yarnContainerId);
 }
}

代码示例来源:origin: org.apache.samza/samza-yarn

/**
 * Stops the YarnContainerManager and all its sub-components.
 * Stop should NOT be called from multiple threads.
 * TODO: fix this to make stop idempotent?.
 */
@Override
public void stop(SamzaApplicationState.SamzaAppStatus status) {
 log.info("Stopping AM client " );
 lifecycle.onShutdown(status);
 amClient.stop();
 log.info("Stopping the AM service " );
 nmClientAsync.stop();
 log.info("Stopping the NM service " );
 service.onShutdown();
 metrics.stop();
 if(status != SamzaApplicationState.SamzaAppStatus.UNDEFINED) {
  cleanupStagingDir();
 }
}

代码示例来源:origin: org.apache.samza/samza-yarn

/**
 * Callback invoked from Yarn when containers complete. This translates the yarn callbacks into Samza specific
 * ones.
 *
 * @param statuses the YarnContainerStatus callbacks from Yarn.
 */
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
 List<SamzaResourceStatus> samzaResrcStatuses = new ArrayList<>();
 for(ContainerStatus status: statuses) {
  log.info("Container completed from RM " + status);
  SamzaResourceStatus samzaResrcStatus = new SamzaResourceStatus(status.getContainerId().toString(), status.getDiagnostics(), status.getExitStatus());
  samzaResrcStatuses.add(samzaResrcStatus);
  String completedContainerID = getIDForContainer(status.getContainerId().toString());
  log.info("Completed container had ID: {}", completedContainerID);
  //remove the container from the list of running containers, if failed with a non-zero exit code, add it to the list of
  //failed containers.
  if(!completedContainerID.equals(INVALID_YARN_CONTAINER_ID)){
   if(state.runningYarnContainers.containsKey(completedContainerID)) {
    log.info("Removing container ID {} from completed containers", completedContainerID);
    state.runningYarnContainers.remove(completedContainerID);
    if(status.getExitStatus() != ContainerExitStatus.SUCCESS)
     state.failedContainersStatus.put(status.getContainerId().toString(), status);
   }
  }
 }
 clusterManagerCallback.onResourcesCompleted(samzaResrcStatuses);
}

代码示例来源:origin: org.apache.samza/samza-yarn

@Override
public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
 log.info("Received a containerStart notification from the NodeManager for container: {} ", containerId);
 String samzaContainerId = getPendingSamzaContainerId(containerId);
 if (samzaContainerId != null) {
  // 1. Move the container from pending to running state
  final YarnContainer container = state.pendingYarnContainers.remove(samzaContainerId);
  log.info("Samza containerId:{} has started", samzaContainerId);
  state.runningYarnContainers.put(samzaContainerId, container);
  // 2. Invoke the success callback.
  SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
    container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
  clusterManagerCallback.onStreamProcessorLaunchSuccess(resource);
 } else {
  log.info("Got an invalid notification from YARN for container: {}", containerId);
 }
}

代码示例来源:origin: org.apache.samza/samza-yarn_2.11

@Override
 public ClusterResourceManager getClusterResourceManager(ClusterResourceManager.Callback callback, SamzaApplicationState state) {
  log.info("Creating an instance of a cluster resource manager for Yarn. ");
  JobModelManager jobModelManager = state.jobModelManager;
  Config config = jobModelManager.jobModel().getConfig();
  YarnClusterResourceManager manager = new YarnClusterResourceManager(config, jobModelManager, callback, state);
  return manager;
 }
}

代码示例来源:origin: org.apache.samza/samza-yarn_2.11

/**
 *
 * Requests the launch of a StreamProcessor with the specified ID on the resource
 * @param resource , the SamzaResource on which to launch the StreamProcessor
 * @param builder, the builder to build the resource launch command from
 *
 * TODO: Support non-builder methods to launch resources. Maybe, refactor into a ContainerLaunchStrategy interface
 */
@Override
public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) {
 String containerIDStr = builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID());
 log.info("Received launch request for {} on hostname {}", containerIDStr, resource.getHost());
 synchronized (lock) {
  try {
   Container container = allocatedResources.get(resource);
   if (container == null) {
    log.info("Resource {} already released. ", resource);
    return;
   }
   runContainer(containerIDStr, container, builder);
  } catch (Throwable t) {
   log.error("Error in launching stream processor:", t);
   clusterManagerCallback.onStreamProcessorLaunchFailure(resource, t);
  }
 }
}

代码示例来源:origin: org.apache.samza/samza-yarn_2.11

@Override
public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
 log.info("Received a containerStart notification from the NodeManager for container: {} ", containerId);
 String samzaContainerId = getPendingSamzaContainerId(containerId);
 if (samzaContainerId != null) {
  // 1. Move the container from pending to running state
  final YarnContainer container = state.pendingYarnContainers.remove(samzaContainerId);
  log.info("Samza containerId:{} has started", samzaContainerId);
  state.runningYarnContainers.put(samzaContainerId, container);
  // 2. Invoke the success callback.
  SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
    container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
  clusterManagerCallback.onStreamProcessorLaunchSuccess(resource);
 } else {
  log.info("Got an invalid notification from YARN for container: {}", containerId);
 }
}

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com