gpt4 book ai didi

java - org.springframework.batch.core.JobExecutionException : Partition handler returned an unsuccessful step

转载 作者:太空宇宙 更新时间:2023-11-04 09:28:50 26 4
gpt4 key购买 nike

我正在学习 Spring Batch,并且能够创建简单的单步应用程序( github repo link )

此应用程序包含执行以下操作的作业:
1.从csv文件中读取人员
2. 小写他们的名字
3.将它们保存到数据库

现在我想学习分区功能,所以我添加了以下分区器:

@Component
public class MyPartitioner implements Partitioner {

@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>(gridSize);
for (int k = 0; k < gridSize; k++) {
ExecutionContext context = new ExecutionContext();
context.putString("keyName", "key_" + k); //Depends on what logic you want to use to split
map.put("PARTITION_KEY" + k, context);
}
return map;
}
}

我的配置如下所示:

@Bean
public Job job() {
return jobBuilderFactory.get("myJob")
.incrementer(new RunIdIncrementer())
.flow(demoPartitionStep())
.end()
.build();
}

private Step demoPartitionStep() {
return stepBuilderFactory.get("demoPartitionStep")
.partitioner("demoPartitionStep", myPartitioner)
.gridSize(21)
.step(csvToDataBaseStep())
.taskExecutor(jobTaskExecutor())
.build();
}

private Step csvToDataBaseStep() {
return stepBuilderFactory.get("csvToDatabaseStep")
.<Person, Person>chunk(10)
.reader(csvPersonReader())
.processor(toLowerCasePersonProcessor)
.writer(dbPersonWriter)
.build();

}

public FlatFileItemReader csvPersonReader() {
return new FlatFileItemReaderBuilder()
.name("csvPersonReader")
.resource(csvResource)
.delimited()
.names(new String[]{"firstName", "lastName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}})
.build();

}

@Bean
public TaskExecutor jobTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// there are 21 sites currently hence we have 21 threads
taskExecutor.setMaxPoolSize(30);
taskExecutor.setCorePoolSize(25);
taskExecutor.setThreadGroupName("custom-executor");
taskExecutor.afterPropertiesSet();
return taskExecutor;
}

当我启动应用程序时,我在日志中看到以下内容:

2019-08-05 19:25:22.303 ERROR 24100 --- [bTaskExecutor-2] o.s.batch.core.step.AbstractStep         : Encountered an error executing step csvToDatabaseStep in job myJob

org.springframework.batch.item.file.NonTransientFlatFileException: Unable to read from resource: [class path resource [users.csv]]
at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:220) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:92) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:161) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:119) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:113) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:203) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:139) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:136) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_111]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]
Caused by: java.io.IOException: Stream closed
at java.io.BufferedReader.ensureOpen(BufferedReader.java:122) ~[na:1.8.0_111]
at java.io.BufferedReader.readLine(BufferedReader.java:317) ~[na:1.8.0_111]
at java.io.BufferedReader.readLine(BufferedReader.java:389) ~[na:1.8.0_111]
at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:201) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
... 26 common frames omitted

以及以下内容:

2019-08-05 19:25:22.319 ERROR 24100 --- [           main] o.s.batch.core.step.AbstractStep         : Encountered an error executing step demoPartitionStep in job myJob

org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:203) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:136) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:313) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:144) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_111]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_111]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_111]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_111]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) [spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at com.sun.proxy.$Proxy77.run(Unknown Source) [na:na]
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:206) [spring-boot-autoconfigure-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:180) [spring-boot-autoconfigure-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:167) [spring-boot-autoconfigure-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:162) [spring-boot-autoconfigure-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:779) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:763) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:318) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at spring.boot.hello.world.MyApplication.main(MyApplication.java:9) [main/:na]

我注意到一些数据已成功插入数据库。我的csv文件包含500个人,但到sql数据库中可能会插入210行或332行甚至600行(我在将 block 大小设置为100时看到它)

如何实现正确的分区?我做错了什么?

更新

我尝试用 @StepScope 标记 csvPersonReader 并且错误消失了,但是

rowCountInDatabaseTable=gridSize * rowCountInCsvFile

我仍在寻找解决方案

最佳答案

实际上,我自己没有使用过partioner,但我也许可以给你一些提示。

您遇到的第一个异常(流已关闭)是由于每个从属进程使用相同的读取器实例。显然,第一个完成的从属进程关闭了读取器,从那一刻起,其他从属进程尝试从关闭的流中读取数据。

您使用 StepScope-Annotation 解决了这个问题,这是解决此问题的正确方法。

分区方法的问题是,您负责对读取的数据进行分区。

您所做的只是创建一个具有 20 个从属进程的分区程序,并且这些从属进程中的每个进程都会读取整个文件。因此,您的数据库包含文件中的每个条目 20 次。

您应该做的是使用适当的“上下文属性”配置每个步骤实例。基于这些上下文属性,步骤实例知道它应该处理哪些行(例如开始行和结束行)。或者,您也可以将原始文件拆分为 20 个具有不同名称的文件,并为每个实例提供另一个文件名。

我找到了两个示例来解释这一点,一个是从数据库读取,另一个是从文件读取:

https://www.mkyong.com/spring-batch/spring-batch-partitioning-example/

https://www.baeldung.com/spring-batch-partitioner

关于java - org.springframework.batch.core.JobExecutionException : Partition handler returned an unsuccessful step,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57362728/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com