gpt4 book ai didi

带有 Spring Boot 的 Spring 批处理在使用 AsyncItemProcessor 的子进程之前终止

转载 作者:行者123 更新时间:2023-12-02 17:35:00 36 4
gpt4 key购买 nike

我将 Spring Batch 与 AsyncItemProcessor 一起使用,但出现了异常行为。让我先展示代码:

按照 Spring Batch project 上所示的简单示例进行操作:

@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {
public static void main(String[] args) throws Exception {// NOSONAR
System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
//SpringApplication.run(PerfilEletricoApp.class, args);
}
}

-- 编辑

如果我只是让主进程休眠,请给 slf4j 几秒钟的时间来刷新日志,一切都会按预期进行。

@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {

public static void main(String[] args) throws Exception {// NOSONAR
//System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
ConfigurableApplicationContext context = SpringApplication.run(PerfilEletricoApp.class, args);

Thread.sleep(1000 * 5);
System.exit(SpringApplication.exit(context));
}

-- 结束编辑

我正在读取一个带有字段的文本文件,然后使用 AsyncItemProcessor 进行多线程处理,其中包括 URL 上的 Http GET 以获取一些数据,我还使用 NoOpWriter 对写部分。我将 GET 的结果保存在作业的处理器部分(使用 log.trace/log.warn)。

@Configuration
public class HttpClientConfigurer {
// [... property and configs omitted]
@Bean
public CloseableHttpClient createHttpClient() {
// ... creates and returns a poolable http client etc
}
}

至于工作:

@Configuration
public class BatchJobConfigurer {

@Autowired
private JobBuilderFactory jobs;

@Autowired
private StepBuilderFactory steps;

@Value("${async.tps:10}")
private Integer tps;

@Value("${com.bemobi.perfilelerico.sourcedir:/AppServer/perfil-eletrico/source-dir/}")
private String sourceDir;

@Bean
public ItemReader<String> reader() {
MultiResourceItemReader<String> reader = new MultiResourceItemReader<>();
reader.setResources( new Resource[] { new FileSystemResource(sourceDir)});
reader.setDelegate((ResourceAwareItemReaderItemStream<? extends String>) flatItemReader());
return reader;
}

@Bean
public ItemReader<String> flatItemReader() {
FlatFileItemReader<String> itemReader = new FlatFileItemReader<>();
itemReader.setLineMapper(new DefaultLineMapper<String>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[] { "sample-field-001"});
}});
setFieldSetMapper(new SimpleStringFieldSetMapper<>());
}});
return itemReader;
}


@Bean
public ItemProcessor asyncItemProcessor(){
AsyncItemProcessor<String, OiPaggoResponse> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(processor());
asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
return asyncItemProcessor;
}

@Bean
public ItemProcessor<String,OiPaggoResponse> processor(){
return new PerfilEletricoItemProcessor();
}

/**
* Using a NoOpItemWriter<T> so we satisfy spring batch flow but don't use writer for anything else.
* @return a NoOpItemWriter<OiPaggoResponse>
*/
@Bean
public ItemWriter<OiPaggoResponse> writer() {
return new NoOpItemWriter<>();
}

@Bean
protected Step step1() throws Exception {
/*
Problem starts here, If Use the processor() everything ends nicely, but if I insist on the asyncItemProcessor(), the job ends and the logs from processor are not stored on the disk.
*/
return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
.reader(reader())
.processor(asyncItemProcessor())
.build();
}

@Bean
public Job job() throws Exception {
return this.jobs.get("consulta-perfil-eletrico").start(step1()).build();
}

@Bean(name = "asyncExecutor")
public TaskExecutor getAsyncExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(tps);
executor.setMaxPoolSize(tps);
executor.setQueueCapacity(tps * 1000);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("AsyncExecutor-");
return executor;
}
}

-- 使用 AsyncItemWriter 更新(工作版本)

   /*Wrapped Writer*/
@Bean
public ItemWriter asyncItemWriter(){
AsyncItemWriter<OiPaggoResponse> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(writer());
return asyncItemWriter;
}

/*AsyncItemWriter defined on the steps*/
@Bean
protected Step step1() throws Exception {
return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
.reader(reader())
.processor(asyncItemProcessor())
.writer(asyncItemWriter())
.build();
}

--关于为什么 AsyncItemProcessor 在向上下文发送 OK-Completed 信号之前不等待所有子项完成的任何想法?

最佳答案

问题是 AsyncItemProcessor 正在创建无人等待的 Future。将您的 NoOpItemWriter 包装在 AsyncItemWriter 中,以便有人在等待 Future。这将使作业按预期完成。

关于带有 Spring Boot 的 Spring 批处理在使用 AsyncItemProcessor 的子进程之前终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27744909/

36 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com