gpt4 book ai didi

org.apache.helix.task.WorkflowConfig类的使用及代码示例

转载 作者:知者 更新时间:2024-03-24 02:47:05 27 4
gpt4 key购买 nike

本文整理了Java中org.apache.helix.task.WorkflowConfig类的一些代码示例,展示了WorkflowConfig类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。WorkflowConfig类的具体详情如下:
包路径:org.apache.helix.task.WorkflowConfig
类名称:WorkflowConfig

WorkflowConfig介绍

[英]Provides a typed interface to workflow level configurations. Validates the configurations.
[中]为工作流级别的配置提供类型化界面。验证配置。

代码示例

代码示例来源:origin: apache/helix

public WorkflowConfig(WorkflowConfig cfg, String workflowId) {
 this(workflowId, cfg.getJobDag(), cfg.getParallelJobs(), cfg.getTargetState(), cfg.getExpiry(),
   cfg.getFailureThreshold(), cfg.isTerminable(), cfg.getScheduleConfig(), cfg.getCapacity(),
   cfg.getWorkflowType(), cfg.isJobQueue(), cfg.getJobTypes(), cfg.getJobPurgeInterval(),
   cfg.isAllowOverlapJobAssignment(), cfg.getTimeout());
}

代码示例来源:origin: apache/helix

/**
 * @return Resource configuration key/value map.
 * @throws HelixException
 */
public Map<String, String> getResourceConfigMap() throws HelixException {
 return _workflowConfig.getResourceConfigMap();
}

代码示例来源:origin: org.apache.helix/helix-core

/**
 * Check if a workflow is ready to schedule.
 * @param workflowCfg the workflow to check
 * @return true if the workflow is ready for schedule, false if not ready
 */
protected boolean isWorkflowReadyForSchedule(WorkflowConfig workflowCfg) {
 Date startTime = workflowCfg.getStartTime();
 // Workflow with non-scheduled config or passed start time is ready to schedule.
 return (startTime == null || startTime.getTime() <= System.currentTimeMillis());
}

代码示例来源:origin: org.apache.helix/helix-core

for (String ancestor : workflowCfg.getJobDag().getAncestors(resourceName)) {
 TaskState jobState = workflowCtx.getJobState(ancestor);
 if (jobState == null || jobState == TaskState.NOT_STARTED) {
if (notStartedCount > 0 || (workflowCfg.isJobQueue() && inCompleteCount >= workflowCfg
  .getParallelJobs())) {
 LOG.debug("Job is not ready to be scheduled due to pending dependent jobs " + resourceName);
 return emptyAssignment(resourceName, currStateOutput);
TargetState targetState = workflowCfg.getTargetState();
if (targetState == TargetState.DELETE) {
 LOG.info(
  && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
 LOG.info("Workflow " + workflowResource
   + " is completed and passed expiry time, cleaning up the workflow context.");
if (!workflowCfg.isTerminable() && jobFinishTime != WorkflowContext.UNFINISHED
  && jobFinishTime + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
 LOG.info("Job " + resourceName
   + " is completed and passed expiry time, cleaning up the job context.");

代码示例来源:origin: org.apache.helix/helix-core

/**
 * Clean up a workflow. This removes the workflow config, idealstate, externalview and workflow
 * contexts associated with this workflow, and all jobs information, including their configs,
 * context, IS and EV.
 */
private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg) {
 LOG.info("Cleaning up workflow: " + workflow);
 if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == TargetState.DELETE) {
  Set<String> jobs = workflowcfg.getJobDag().getAllNodes();
  // Remove all pending timer tasks for this workflow if exists
  _rebalanceScheduler.removeScheduledRebalance(workflow);
  for (String job : jobs) {
   _rebalanceScheduler.removeScheduledRebalance(job);
  }
  if (!TaskUtil.removeWorkflow(_manager.getHelixDataAccessor(),
    _manager.getHelixPropertyStore(), workflow, jobs)) {
   LOG.warn("Failed to clean up workflow " + workflow);
  }
 } else {
  LOG.info("Did not clean up workflow " + workflow
    + " because neither the workflow is non-terminable nor is set to DELETE.");
 }
}

代码示例来源:origin: org.apache.helix/helix-core

private void removeWorkflowFromZK(String workflow) {
 Set<String> jobSet = new HashSet<>();
 // Note that even WorkflowConfig is null, if WorkflowContext exists, still need to remove workflow
 WorkflowConfig wCfg = TaskUtil.getWorkflowConfig(_accessor, workflow);
 if (wCfg != null) {
  jobSet.addAll(wCfg.getJobDag().getAllNodes());
 }
 boolean success = TaskUtil.removeWorkflow(_accessor, _propertyStore, workflow, jobSet);
 if (!success) {
  LOG.info("Failed to delete the workflow " + workflow);
  throw new HelixException("Failed to delete the workflow " + workflow);
 }
}

代码示例来源:origin: apache/helix

TargetState targetState = workflowCfg.getTargetState();
if (targetState == TargetState.DELETE) {
 LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
if (!workflowCfg.isJobQueue() && !finalStates.contains(workflowCtx.getWorkflowState())) {
 scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), workflowCfg.getTimeout());
 if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState()) && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
  workflowCtx.setWorkflowState(TaskState.TIMED_OUT);
  _taskDataCache.updateWorkflowContext(workflow, workflowCtx);
 long expiryTime = workflowCfg.getExpiry();
if (!workflowCfg.isTerminable() || workflowCfg.isJobQueue()) {
 Set<String> jobWithFinalStates = new HashSet<>(workflowCtx.getJobStates().keySet());
 jobWithFinalStates.removeAll(workflowCfg.getJobDag().getAllNodes());
 if (jobWithFinalStates.size() > 0) {
  workflowCtx.setLastJobPurgeTime(System.currentTimeMillis());

代码示例来源:origin: org.apache.helix/helix-core

TargetState targetState = workflowCfg.getTargetState();
if (targetState == TargetState.DELETE) {
 LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
if (!workflowCfg.isJobQueue() && !finalStates.contains(workflowCtx.getWorkflowState())) {
 scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), workflowCfg.getTimeout());
   && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
  workflowCtx.setWorkflowState(TaskState.TIMED_OUT);
  clusterData.updateWorkflowContext(workflow, workflowCtx, _manager.getHelixDataAccessor());
 long expiryTime = workflowCfg.getExpiry();
   workflowCfg.getStartTime().getTime());
 return buildEmptyAssignment(workflow, currStateOutput);
if (!workflowCfg.isTerminable() || workflowCfg.isJobQueue()) {
 purgeExpiredJobs(workflow, workflowCfg, workflowCtx);

代码示例来源:origin: apache/helix

if (newWorkflowConfig.getWorkflowId() == null || newWorkflowConfig.getWorkflowId().isEmpty()) {
 newWorkflowConfig.getRecord()
   .setSimpleField(WorkflowConfig.WorkflowConfigProperty.WorkflowID.name(), workflow);
if (workflow == null || !workflow.equals(newWorkflowConfig.getWorkflowId())) {
 throw new HelixException(String
   .format("Workflow name {%s} does not match the workflow Id from WorkflowConfig {%s}",
     workflow, newWorkflowConfig.getWorkflowId()));
if (currentConfig.isTerminable()) {
 throw new HelixException(
   "Workflow " + workflow + " is terminable, not allow to change its configuration!");
newWorkflowConfig.setJobDag(currentConfig.getJobDag());
if (!TaskUtil.setWorkflowConfig(_accessor, workflow, newWorkflowConfig)) {
 LOG.error("Failed to update workflow configuration for workflow " + workflow);

代码示例来源:origin: org.apache.helix/helix-core

WorkflowContext workflowCtx, Map<String, JobConfig> jobConfigMap,
 ClusterDataCache clusterDataCache) {
ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
if (scheduleConfig != null && scheduleConfig.isRecurring()) {
 LOG.debug("Jobs from recurring workflow are not schedule-able");
int scheduledJobs = 0;
long timeToSchedule = Long.MAX_VALUE;
for (String job : workflowCfg.getJobDag().getAllNodes()) {
 TaskState jobState = workflowCtx.getJobState(job);
 if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
 if (workflowCfg.isJobQueue() && scheduledJobs >= workflowCfg.getParallelJobs()) {
  if (LOG.isDebugEnabled()) {
   LOG.debug(String.format("Workflow %s already have enough job in progress, "

代码示例来源:origin: org.apache.helix/helix-core

for (String job : cfg.getJobDag().getAllNodes()) {
 TaskState jobState = ctx.getJobState(job);
 if (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) {
  failedJobs++;
  if (!cfg.isJobQueue() && failedJobs > cfg.getFailureThreshold()) {
   ctx.setWorkflowState(TaskState.FAILED);
   LOG.info("Workflow {} reached the failure threshold, so setting its state to FAILED.", cfg.getWorkflowId());
   for (String jobToFail : cfg.getJobDag().getAllNodes()) {
    if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
     ctx.setJobState(jobToFail, TaskState.ABORTED);
if (!incomplete && cfg.isTerminable()) {
 ctx.setWorkflowState(TaskState.COMPLETED);
 return true;

代码示例来源:origin: apache/incubator-gobblin

private void cleanUpJobs(HelixManager helixManager) {
 // Clean up existing jobs
 TaskDriver taskDriver = new TaskDriver(helixManager);
 Map<String, WorkflowConfig> workflows = taskDriver.getWorkflows();
 boolean cleanupDistJobs = ConfigUtils.getBoolean(this.config,
   GobblinClusterConfigurationKeys.CLEAN_ALL_DIST_JOBS,
   GobblinClusterConfigurationKeys.DEFAULT_CLEAN_ALL_DIST_JOBS);
 for (Map.Entry<String, WorkflowConfig> entry : workflows.entrySet()) {
  String workflowName = entry.getKey();
  if (workflowName.contains(GobblinClusterConfigurationKeys.PLANNING_CONF_PREFIX)
    || workflowName.contains(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX)) {
   if (!cleanupDistJobs) {
    log.info("Distributed job {} won't be deleted.", workflowName);
    continue;
   }
  }
  WorkflowConfig workflowConfig = entry.getValue();
  // request delete if not already requested
  if (workflowConfig.getTargetState() != TargetState.DELETE) {
   taskDriver.delete(workflowName);
   log.info("Requested delete of workflowName {}", workflowName);
  }
 }
}

代码示例来源:origin: apache/helix

String workflowResource, String jobResource, ClusterDataCache cache) {
if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
 return true;
ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
Date startTime = scheduleConfig.getStartTime();
long currentTime = new Date().getTime();
  if (!workflowCfg.getTargetState().equals(TargetState.START)) {
   LOG.debug(
     "Skip scheduling since the workflow has not been started " + workflowResource);

代码示例来源:origin: apache/helix

for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
  TaskState jobState = workflowContext.getJobState(jobName);
  if (jobState == TaskState.IN_PROGRESS) {
 for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
  JobContext jobContext = driver.getJobContext(jobName);
  if (jobContext != null) {
 if (!workflowConfig.isAllowOverlapJobAssignment()) {
  Set<String> instances = new HashSet<String>();
  for (JobContext jobContext : jobContextList) {
return maxRunningCount > 1 && (workflowConfig.isJobQueue() ? maxRunningCount <= workflowConfig
  .getParallelJobs() : true);

代码示例来源:origin: org.apache.helix/helix-core

int incompleteParentCount = 0;
for (String parent : workflowCfg.getJobDag().getDirectParents(job)) {
 TaskState jobState = workflowCtx.getJobState(parent);
 if (jobState == null || jobState == TaskState.NOT_STARTED) {
if (workflowCfg.isJobQueue()) {
 if (incompleteAllCount >= workflowCfg.getParallelJobs()) {
  if (LOG.isDebugEnabled()) {
   LOG.debug(String.format("Job %s is not ready to schedule, inCompleteJobs(s)=%d.", job,

代码示例来源:origin: org.apache.helix/helix-core

/**
 * Checks if the workflow has completed.
 * @param ctx Workflow context containing job states
 * @param cfg Workflow config containing set of jobs
 * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
 */
private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
 if (!cfg.isTerminable()) {
  return false;
 }
 for (String job : cfg.getJobDag().getAllNodes()) {
  if (ctx.getJobState(job) != TaskState.COMPLETED) {
   return false;
  }
 }
 return true;
}

代码示例来源:origin: org.apache.helix/helix-core

WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
 ClusterDataCache cache) {
TargetState jobTgtState = workflowConfig.getTargetState();
     long finishTime = currentTime;
     workflowCtx.setJobState(jobResource, TaskState.FAILED);
     if (workflowConfig.isTerminable()) {
      workflowCtx.setWorkflowState(TaskState.FAILED);
      workflowCtx.setFinishTime(finishTime);

代码示例来源:origin: apache/helix

Set<String> ret = new HashSet<>();
if (!workflowCfg.isAllowOverlapJobAssignment()) {
 for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
  if (jobName.equals(currentJobName)) {
   continue;

代码示例来源:origin: apache/helix

@Override
 public void execute(ClusterEvent event) {
  ClusterDataCache clusterDataCache = event.getAttribute(AttributeName.ClusterDataCache.name());
  HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());

  if (clusterDataCache == null || manager == null) {
   LOG.warn(
     "ClusterDataCache or HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
     event.getEventId(), event.getEventType(), event.getClusterName());
   return;
  }

  Set<WorkflowConfig> existingWorkflows =
    new HashSet<>(clusterDataCache.getWorkflowConfigMap().values());
  for (WorkflowConfig workflowConfig : existingWorkflows) {
   // clean up the expired jobs if it is a queue.
   if (workflowConfig != null && (!workflowConfig.isTerminable() || workflowConfig
     .isJobQueue())) {
    try {
     TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), workflowConfig,
       clusterDataCache.getWorkflowContext(workflowConfig.getWorkflowId()), manager,
       _rebalanceScheduler);
    } catch (Exception e) {
     LOG.warn(String.format("Failed to purge job for workflow %s with reason %s",
       workflowConfig.getWorkflowId(), e.toString()));
    }
   }
  }
 }
}

代码示例来源:origin: apache/helix

for (String job : workflowConfig.getJobDag().getAllNodes()) {
 JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
 JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
  LOG.error(String.format(
    "Job %s exists in JobDAG but JobConfig is missing! Job might have been deleted manually from the JobQueue: %s, or left in the DAG due to a failed clean-up attempt from last purge.",
    job, workflowConfig.getWorkflowId()));
  expiry = workflowConfig.getExpiry();

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com