gpt4 book ai didi

java - 在 Spring Batch 中生成多个异步作业后如何使主线程保持等待状态

转载 作者:行者123 更新时间:2023-12-01 19:05:00 26 4
gpt4 key购买 nike

我在 Spring Batch 中使用 TaskExecutor 配置了异步作业启动器。

我已经从主线程启动了 n 个异步作业。

现在我想让 main 方法等待,直到所有 n 个作业完成。我尝试过使用 CountDownLatch 和 Phaser 但无法实现。

请问如何实现?

这里有一些类似的虚拟代码片段。

@Configuration
public class SpringBatchConfiguration {
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(maxThreadCount);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}

@Bean
@Primary
public JobLauncher getJobLauncher(JobRepository jobRepo, SimpleAsyncTaskExecutor taskExecuter) {
SimpleJobLauncher jobL = new SimpleJobLauncher();
jobL.setJobRepository(jobRepo);
jobL.setTaskExecutor(taskExecuter);
return jobL;
}
}

主线程执行以下代码。

 Class JobHandler {
@Autowired
JobLauncher jobLauncher;

// Main thread executing demo method

public void demo () {
launchJob(job1, builder1);
launchJob(job2, builder2);
launchJob(job3, builder3);
launchJob(job4, builder4);
launchJob(job5, builder5);
launchJob(job6, builder6);

// Here I want to do operation when all six jobs complete successfully
// Please suggest how to do.
}

public void launchJob(Job job, JobParametersBuilder builder ) {
try {

// Asynchronous job submission
final JobExecution execution = jobLauncher.run(job, builder.toJobParameters());
}
catch(Throwable th) {
log.error("Job execution failed");
}
}
}

最佳答案

使用ThreadPoolTask​​Executor是正确的方法。此任务执行器实现提供了一个名为 WaitForTasksToCompleteOnShutdown 的 boolean 参数,您需要设置该参数才能等待作业完成。因此,您可以执行以下操作:

在任务执行器上设置WaitForTasksToCompleteOnShutdown:

@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(maxThreadCount);
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}

在任务执行器上调用shutdown以等待作业完成:

Class JobHandler {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private ThreadPoolTaskExecutor taskExecutor;

// Main thread executing demo method

public void demo () {
launchJob(job1, builder1);
launchJob(job2, builder2);
launchJob(job3, builder3);
launchJob(job4, builder4);
launchJob(job5, builder5);
launchJob(job6, builder6);

// Here I want to do operation when all six jobs complete successfully
// Please suggest how to do.
taskExecutor.shutdown();
}

}

这是一个完整的示例:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@EnableBatchProcessing
public class JobsConfig {

private final JobBuilderFactory jobs;

private final StepBuilderFactory steps;

public JobsConfig(JobBuilderFactory jobs, StepBuilderFactory steps) {
this.jobs = jobs;
this.steps = steps;
}

@Bean
public Job job1() {
return jobs.get("job1")
.start(steps.get("step11")
.tasklet((contribution, chunkContext) -> {
System.out.println("job1");
Thread.sleep(1000);
return RepeatStatus.FINISHED;
})
.build())
.build();
}

@Bean
public Job job2() {
return jobs.get("job2")
.start(steps.get("step21")
.tasklet((contribution, chunkContext) -> {
System.out.println("job2");
Thread.sleep(2000);
return RepeatStatus.FINISHED;
})
.build())
.build();
}

@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(10);
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}

@Bean
public JobLauncher jobLauncher(JobRepository jobRepository, ThreadPoolTaskExecutor taskExecutor) throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(taskExecutor);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}

public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(JobsConfig.class);
ThreadPoolTaskExecutor taskExecutor = context.getBean(ThreadPoolTaskExecutor.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job1 = (Job) context.getBean("job1");
Job job2 = (Job) context.getBean("job2");
jobLauncher.run(job1, new JobParameters());
jobLauncher.run(job2, new JobParameters());
System.out.println("submitted 2 jobs, waiting for completion..");
taskExecutor.shutdown();
}

}

它打印:

submitted 2 jobs, waiting for completion..
job1
job2

关于java - 在 Spring Batch 中生成多个异步作业后如何使主线程保持等待状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59577125/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com