gpt4 book ai didi

java - 带有 SimpleAsyncTaskExecutor 的 Spring Batch 未保存到数据库

转载 作者:行者123 更新时间:2023-12-01 16:58:22 27 4
gpt4 key购买 nike

我有一个简单的 Spring Boot 应用程序,其中一个 enpoint 通过 JobLauncher bean 中配置的 SimpleAsyncTaskExecutor 异步调用 Spring Batch 作业。

Spring Batch 作业异步启动并且工作正常,但没有任何内容保存到数据库中。

如果我删除 SimpleAsyncTaskExecutor,数据就会保存。

这是我的 BatchConfigurer。这里我使用 SimpleAsyncTaskExecutor 配置 JobLauncher 。如果我删除行 simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());//(1) 数据被保存。

@Component
public class CustomBatchConfiguration implements BatchConfigurer {

private static final Log LOGGER = LogFactory.getLog(CustomBatchConfiguration.class);

@Autowired
private BatchProperties properties;

@Autowired
private DataSource dataSource;

@Autowired
private EntityManagerFactory entityManagerFactory;

private PlatformTransactionManager transactionManager;

private JobRepository jobRepository;

private JobLauncher jobLauncher;

private JobExplorer jobExplorer;

/**
* Registers {@link JobRepository} bean.
*/
@Override
public JobRepository getJobRepository() {
return this.jobRepository;
}

/**
* Registers {@link PlatformTransactionManager} bean.
*/
@Override
public PlatformTransactionManager getTransactionManager() {
return this.transactionManager;
}

/**
* Registers {@link JobLauncher} bean.
*/
@Override
public JobLauncher getJobLauncher() {
return this.jobLauncher;
}

/**
* Registers {@link JobExplorer} bean. This bean is actually created in
* {@link BatchConfig}.
*/
@Override
public JobExplorer getJobExplorer() throws Exception {
return this.jobExplorer;
}

/**
* Initializes Spring Batch components.
*/
@PostConstruct
public void initialize() {
try {
this.transactionManager = createTransactionManager();
this.jobRepository = createJobRepository();
this.jobLauncher = createJobLauncher();
this.jobExplorer = createJobExplorer();
} catch (Exception ex) {
throw new IllegalStateException("Unable to initialize Spring Batch", ex);
}
}

private JobExplorer createJobExplorer() throws Exception {
JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
jobExplorerFactoryBean.setDataSource(this.dataSource);
String tablePrefix = this.properties.getTablePrefix();

if (StringUtils.hasText(tablePrefix)) {
jobExplorerFactoryBean.setTablePrefix(tablePrefix);
}

jobExplorerFactoryBean.afterPropertiesSet();
return jobExplorerFactoryBean.getObject();
}

private JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();

simpleJobLauncher.setJobRepository(getJobRepository());
simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); // (1)
simpleJobLauncher.afterPropertiesSet();

return simpleJobLauncher;
}

private JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();

jobRepositoryFactoryBean.setDatabaseType("db2");
jobRepositoryFactoryBean.setDataSource(this.dataSource);

if (this.entityManagerFactory != null) {
LOGGER.warn("JPA does not support custom isolation levels, so locks may not be taken when launching Jobs");
jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_DEFAULT");
}

String tablePrefix = this.properties.getTablePrefix();
if (StringUtils.hasText(tablePrefix)) {
jobRepositoryFactoryBean.setTablePrefix(tablePrefix);
}

jobRepositoryFactoryBean.setTransactionManager(getTransactionManager());
jobRepositoryFactoryBean.afterPropertiesSet();

return jobRepositoryFactoryBean.getObject();
}

private PlatformTransactionManager createTransactionManager() {
return new DataSourceTransactionManager(dataSource);
}

}

这是我的 BatchConfiguration。

@Autowired(required = true)
private MyItemReader myItemReader;

@Autowired(required = true)
private MyItemProcessor myItemProcessor;

@Autowired(required = true)
private MyItemWriter myItemWriter;


@Bean
public Step myStep(TaskExecutor taskExecutor) {

return stepBuilderFactory.get("myStepName")
.<SomeWrapper, SomeWrapper>chunk(
1)
.reader(myItemReader)
.processor(myItemProcessor).writer(myItemWriter)

.build();
}

@Bean(name = "myJob")
public Job myJob(Step myStep) {
return jobBuilderFactory.get("myJobName").incrementer(new RunIdIncrementer())
.flow(myStep).end().build();
}

我错过了什么吗?

提前致谢

最佳答案

我执行了以下操作来解决我的问题:

  1. CustomBatchConfiguration 中,我更改了以下内容

    private PlatformTransactionManager createTransactionManager() {        
    return new DataSourceTransactionManager(dataSource);
    }

private PlatformTransactionManager createTransactionManager() {
if (this.entityManagerFactory != null) {
return new JpaTransactionManager(this.entityManagerFactory);
}

return new DataSourceTransactionManager(this.dataSource);
}

我使用了spring JpaTransactionManager。

  • 我将 JobLauncher TaskExecutor 更改为

    simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
  • ThreadPoolTaskExecutor threadPoolExecutor = new ThreadPoolTaskExecutor();
    threadPoolExecutor.setMaxPoolSize(springBatchJobThreadPollMaxSize);
    threadPoolExecutor.afterPropertiesSet();

    TaskExecutor taskExecutor = new DelegatingSecurityContextAsyncTaskExecutor(threadPoolExecutor);

    创建更强大的线程池并在它们之间共享 spring 安全上下文。

  • 我使用了 Tasklet,而不是面向 Spring Batch block 的解决方案。 Here详细信息。
  • 关于java - 带有 SimpleAsyncTaskExecutor 的 Spring Batch 未保存到数据库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61553610/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com