gpt4 book ai didi

java - Spring Batch - 如何根据上一步中创建的参数生成并行步骤

转载 作者:行者123 更新时间:2023-12-01 09:54:59 25 4
gpt4 key购买 nike

简介

我正在尝试使用在tasklet 中创建的作业参数来创建执行tasklet 后的步骤。

tasklet 尝试查找一些文件 (findFiles()),如果找到一些文件,它将文件名保存到字符串列表中。

在小任务中,我传递的数据如下: chunkContext.getStepContext().getStepExecution().getExecutionContext().put("files", fileNames);

下一步是一个并行流程,其中每个文件都会执行一个简单的读取器-处理器-编写器步骤(如果您对我如何到达那里感兴趣,请参阅我之前的问题:Spring Batch - Looping a reader/processor/writer step)

在构建作业 readFilesJob() 时,最初会使用“假”文件列表创建一个流,因为只有在执行了 tasklet 后,才知道真正的文件列表。

问题

如何配置我的作业,以便首先执行 tasklet,然后使用从 tasklet 生成的文件列表执行并行流?

我认为这可以归结为在运行时的正确时刻加载正确数据的文件名列表......但如何实现?

重现

这是我的简化配置:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

private static final String FLOW_NAME = "flow1";
private static final String PLACE_HOLDER = "empty";

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;

public List<String> files = Arrays.asList(PLACE_HOLDER);

@Bean
public Job readFilesJob() throws Exception {
List<Step> steps = files.stream().map(file -> createStep(file)).collect(Collectors.toList());

FlowBuilder<Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);

Flow flow = flowBuilder
.start(findFiles())
.next(createParallelFlow(steps))
.build();

return jobBuilderFactory.get("readFilesJob")
.start(flow)
.end()
.build();
}

private static Flow createParallelFlow(List<Step> steps){
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(steps.size());

List<Flow> flows = steps.stream()
.map(step ->
new FlowBuilder<Flow>("flow_" + step.getName())
.start(step)
.build())
.collect(Collectors.toList());

return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor)
.add(flows.toArray(new Flow[flows.size()]))
.build();
}

private Step createStep(String fileName){
return stepBuilderFactory.get("readFile" + fileName)
.chunk(100)
.reader(reader(fileName))
.writer(writer(filename))
.build();
}

private FileFinder findFiles(){
return new FileFinder();
}
}

研究

来自How to safely pass params from Tasklet to step when running parallel jobs的问答建议在读取器/写入器中使用这样的构造:

@Value("#{jobExecutionContext[filePath]}") 字符串文件路径

但是,由于在 createParallelFlow() 方法中创建步骤的方式,我真的希望可以将文件名作为字符串传递给读取器/写入器。因此,即使该问题的答案可能是我的问题的解决方案,但这并不是理想的解决方案。但如果我错了,请不要阻止纠正我。

结束

我使用文件名示例来更好地阐明问题。我的问题实际上不是从目录中读取多个文件。我的问题实际上归结为在运行时生成数据并将其传递到下一个动态生成的步骤的想法。

编辑:

添加了 fileFinder 的简化 tasklet。

@Component
public class FileFinder implements Tasklet, InitializingBean {

List<String> fileNames;

public List<String> getFileNames() {
return fileNames;
}

@PostConstruct
public void afterPropertiesSet() {
// read the filenames and store dem in the list
fileNames.add("sample-data1.csv");
fileNames.add("sample-data2.csv");
}

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
// Execution of methods that will find the file names and put them in the list...
chunkContext.getStepContext().getStepExecution().getExecutionContext().put("files", fileNames);
return RepeatStatus.FINISHED;
}
}

最佳答案

我不确定我是否正确理解了您的问题,但据我所知,在动态构建作业之前,您需要拥有包含文件名的列表。

你可以这样做:

@Component
public class MyJobSetup {
List<String> fileNames;

public List<String> getFileNames() {
return fileNames;
}

@PostConstruct
public void afterPropertiesSet() {
// read the filenames and store dem in the list
fileNames = ....;
}
}

之后,您可以将此 Bean 注入(inject)到 JobConfiguration Bean 中

@Configuration
@EnableBatchProcessing
@Import(MyJobSetup.class)
public class BatchConfiguration {

private static final String FLOW_NAME = "flow1";
private static final String PLACE_HOLDER = "empty";

@Autowired
private MyJobSetup jobSetup; // <--- Inject
// PostConstruct of MyJobSetup was executed, when it is injected

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;

public List<String> files = Arrays.asList(PLACE_HOLDER);

@Bean
public Job readFilesJob() throws Exception {
List<Step> steps = jobSetUp.getFileNames() // get the list of files
.stream() // as stream
.map(file -> createStep(file)) // map...
.collect(Collectors.toList()); // and create the list of steps

关于java - Spring Batch - 如何根据上一步中创建的参数生成并行步骤,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37310658/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com