gpt4 book ai didi

java - Spring 批处理3.0 : StepExecutionListener for a partitioned Step and cascading of execution context values to the partitioned job

转载 作者:行者123 更新时间:2023-12-01 20:18:20 25 4
gpt4 key购买 nike

给定一个使用分区的 Spring Batch 作业:

<job id="reportingJob" xmlns="http://www.springframework.org/schema/batch">
<batch:listeners>
<batch:listener ref="reportingJobExecutionListenerr" />
</batch:listeners>
<batch:step id="reportingMasterStep">
<partition step="reportingSlaveStep"
partitioner="reportingPartitioner">
<batch:handler grid-size="10" task-executor="taskExecutor" />
</partition>
</batch:step>
</job>

reportingSlaveStep定义为:

<step id="reportingSlaveStep" xmlns="http://www.springframework.org/schema/batch">
<job ref="reportingSlaveJob" />
</step>

reportingSlaveJob定义为:

    <job id="reportingSlaveJob" xmlns="http://www.springframework.org/schema/batch">
<batch:listeners>
<batch:listener ref="reportsOutputListener" />
</batch:listeners>
<batch:split id="reportsCreationSplit"
task-executor="taskExecutor">
<batch:flow>
<batch:step id="basicReportStep">
<tasklet throttle-limit="5" task-executor="taskExecutor">
<batch:chunk reader="basicReportReader"
writer="basicReportWriter" commit-interval="500" />
</tasklet>
</batch:step>
</batch:flow>
<batch:flow>
<batch:step id="advancedReportStep">
<tasklet throttle-limit="5" task-executor="taskExecutor">
<batch:chunk reader="advancedReportDataReader" writer="advancedReportWriter"
commit-interval="500" />
</tasklet>
</batch:step>
</batch:flow>
</batch:split>
</job>

我现在有 2 个问题:

  1. 我希望为每个分区创建一个新的 reportsOutputListener 实例。我可以通过将 reportsOutputListener 设为 Step 作用域 bean 来实现此目的吗?
  2. 我希望能够访问为 reportingJob 创建的相同 jobExecutionContext,以便在 reportingSlaveJob 中访问。我是否需要对此进行任何特殊处理,或者 reportingSlaveStepSlaveJob 也使用 reportingJob 中的相同 jobExecutionContext 实例吗?
  3. 编辑:当我运行上述作业时,有时会出现异常,提示“此作业的作业执行已在运行”,有时会出现 NullPointerExceptionMapExecutionContextDao.java:130 上。

编辑:另请注意,对于第 2 点,slaveJob 无法访问 stepExecutionContext 中添加的值(使用 >#{stepExecutionContext['msbfBatchId']}(在 Spring 配置 xml 中)由 reportingPartitioner 执行。针对该键的 stepExecutionContext 中的值显示为 null

最佳答案

I want a new reportsOutputListener instance to be created for each partition. Can I achieve this by making reportsOutputListener a Step scoped bean?

答案是。 (正如 Mahmoud Ben Hassine 的评论中提到的)

I want to be able to access the same jobExecutionContext created for reportingJob to be accessible in reportingSlaveJob. Do I need to any special handling for this or is the same jobExecutionContext instance in reportingJob is used by the reportingSlaveStepSlaveJob as well?

答案是。我深入研究了 Spring Batch 代码并发现 JobStep使用 JobParametersExtractor用于复制 stepExecutionContext 中的值到JobParameters 。这意味着reportingSlaveJob可以从 JobParameters 访问这些值而不是 StepExecutionContext 。也就是说,由于某种原因,DefaultJobParametersExtractor Srping Batch 3.0 中的实现似乎并未将值复制到 jobParameters正如预期的那样。我最终编写了以下自定义提取器:

public class CustomJobParametersExtractor implements JobParametersExtractor {

private Set<String> keys;

public CustomJobParametersExtractor () {
this.keys = new HashSet<>();
}

@Override
public JobParameters getJobParameters(Job job, StepExecution stepExecution) {
JobParametersBuilder builder = new JobParametersBuilder();
Map<String, JobParameter> jobParameters = stepExecution.getJobParameters().getParameters();
ExecutionContext stepExecutionContext = stepExecution.getExecutionContext();
ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext();

// copy job parameters from parent job to delegate job
for (String key : jobParameters.keySet()) {
builder.addParameter(key, jobParameters.get(key));
}

// copy job/step context from parent job/step to delegate job
for (String key : keys) {
if (jobExecutionContext.containsKey(key)) {
builder.addString(key, jobExecutionContext.getString(key));
} else if (stepExecutionContext.containsKey(key)) {
builder.addString(key, stepExecutionContext.getString(key));
} else if (jobParameters.containsKey(key)) {
builder.addString(key, (String) jobParameters.get(key).getValue());
}
}
return builder.toJobParameters();
}

public void setKeys(String[] keys) {
this.keys = new HashSet<>(Arrays.asList(keys));
}

}

然后我可以在报告从属步骤中使用上述提取器,如下所示:

<step id="reportingSlaveStep" xmlns="http://www.springframework.org/schema/batch">
<job ref="reportingSlaveJob" job-parameters-extractor="customJobParametersExtractor"/>
</step>

哪里customJobParametersExtractorCustomJobParametersExtractor 类型的 bean它传递了我们想要复制到 JobParameters 的所有 key 。的reportingSlaveJob

When I run the above job, at times I get an exception saying that the "A job execution for this job is already running" and other times I get a NullPointerException on MapExecutionContextDao.java:130

发生这种情况的原因是因为没有我的CustomJobParameterExtractorreportingSlaveJob正在以空 JobParameters 启动。为了让 Spring Batch 创建新的作业实例,每次启动 reportingSlaveJob 时作业参数必须不同。 。使用CustomJobParameterExtractor也解决了这个问题。

关于java - Spring 批处理3.0 : StepExecutionListener for a partitioned Step and cascading of execution context values to the partitioned job,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58947946/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com