gpt4 book ai didi

Spring 批处理 :Restart a job and then start next job automatically

转载 作者:行者123 更新时间:2023-12-04 04:10:16 26 4
gpt4 key购买 nike

我需要创建一个恢复模式。
在我的模式中,我只能在给定的时间窗口内启 Action 业。
如果作业失败,它只会在下一个时间窗口重新启动,完成后我想开始为此窗口提前计划的计划作业。
作业之间的唯一区别是时间窗口参数。

我想到了 JobExecutionDecider 与 JobExplorer 结合使用或覆盖 Joblauncher。但一切似乎都太侵入性了。

我没有找到一个符合我需要的例子,任何想法都会受到欢迎。

最佳答案

只是根据不完整代码提供的建议来回顾一下实际完成的工作。
我创建了一个类似于下面的恢复流程。恢复流程包装了我的实际批次,仅负责为内部作业提供正确的作业参数。它可以是第一次执行时的初始参数、正常执行时的新参数或最后一次执行失败时的旧参数。

<batch:job id="recoveryWrapper"
incrementer="wrapperRunIdIncrementer"
restartable="true">
<batch:decision id="recoveryFlowDecision" decider="recoveryFlowDecider">
<batch:next on="FIRST_RUN" to="defineParametersOnFirstRun" />
<batch:next on="RECOVER" to="recover.batchJob " />
<batch:next on="CURRENT" to="current.batchJob " />
</batch:decision>
<batch:step id="defineParametersOnFirstRun" next="current.batchJob">
<batch:tasklet ref="defineParametersOnFirstRunTasklet"/>
</batch:step>
<batch:step id="recover.batchJob " next="current.batchJob">
<batch:job ref="batchJob" job-launcher="jobLauncher"
job-parameters-extractor="jobParametersExtractor" />
</batch:step>
<batch:step id="current.batchJob" >
<batch:job ref="batchJob" job-launcher="jobLauncher"
job-parameters-extractor="jobParametersExtractor" />
</batch:step>
</batch:job>

解决方案的核心是 RecoveryFlowDecider 作业参数提取器 使用时 Spring批量重启机制。
RecoveryFlowDecider 将查询 JobExplorer 和 JobRepository 以查明上次运行是否失败。它将最后一次执行放在包装器的执行上下文中,以便稍后在 JobParametersExtractor 中使用。
请注意使用 runIdIncremeter 允许重新执行包装器作业。
@Component
public class RecoveryFlowDecider implements JobExecutionDecider {
private static final String FIRST_RUN = "FIRST_RUN";
private static final String CURRENT = "CURRENT";
private static final String RECOVER = "RECOVER";

@Autowired
private JobExplorer jobExplorer;
@Autowired
private JobRepository jobRepository;

@Override
public FlowExecutionStatus decide(JobExecution jobExecution
,StepExecution stepExecution) {
// the wrapper is named as the wrapped job + WRAPPER
String wrapperJobName = jobExecution.getJobInstance().getJobName();
String jobName;
jobName = wrapperJobName.substring(0,wrapperJobName.indexOf(EtlConstants.WRAPPER));
List<JobInstance> instances = jobExplorer.getJobInstances(jobName, 0, 1);
JobInstance internalJobInstance = instances.size() > 0 ? instances.get(0) : null;

if (null == internalJobInstance) {
return new FlowExecutionStatus(FIRST_RUN);
}
JobExecution lastExecution = jobRepository.getLastJobExecution(internalJobInstance.getJobName()
,internalJobInstance.getJobParameters());
//place the last execution on the context (wrapper context to use later)
jobExecution.getExecutionContext().put(EtlConstants.LAST_EXECUTION, lastExecution);
ExitStatus exitStatus = lastExecution.getExitStatus();

if (ExitStatus.FAILED.equals(exitStatus) || ExitStatus.UNKNOWN.equals(exitStatus)) {
return new FlowExecutionStatus(RECOVER);
}else if(ExitStatus.COMPLETED.equals(exitStatus)){
return new FlowExecutionStatus(CURRENT);
}
//We should never get here unless we have a defect
throw new RuntimeException("Unexpecded batch status: "+exitStatus+" in decider!");

}

}

然后 JobParametersExtractor 将再次测试上次执行的结果,如果作业失败,它将提供用于执行失败作业的原始参数,触发 Spring Bacth 重启机制。否则,它将创建一组新的参数,并将按其正常进程执行。
    @Component
public class JobExecutionWindowParametersExtractor implements
JobParametersExtractor {
@Override
public JobParameters getJobParameters(Job job, StepExecution stepExecution) {

// Read the last execution from the wrapping job
// in order to build Next Execution Window
JobExecution lastExecution= (JobExecution) stepExecution.getJobExecution().getExecutionContext().get(EtlConstants.LAST_EXECUTION);;

if(null!=lastExecution){
if (ExitStatus.FAILED.equals(lastExecution.getExitStatus())) {
JobInstance instance = lastExecution.getJobInstance();
JobParameters parameters = instance.getJobParameters();
return parameters;
}
}
//We do not have failed execution or have no execution at all we need to create a new execution window
return buildJobParamaters(lastExecution,stepExecution);
}
...
}

关于 Spring 批处理 :Restart a job and then start next job automatically,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14307908/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com