gpt4 book ai didi

java - org.springframework.batch.item.ItemStreamException : Failed to initialize the reader in JdbcPagingItemReader

转载 作者:行者123 更新时间:2023-12-02 11:01:16 25 4
gpt4 key购买 nike

我已经使用注释在 Spring Boot 中实现了批处理过程,它为我提供了服务。我正在使用 SimpleAsyncTaskExecutor 以及读取器、处理器和写入器从数据库读取并在进程中进行验证,然后插入写入器的数据库中。当一项工作启动时,如果我再提供一项工作来运行它,那么批处理就会失败。我从网络服务获取所有参数dynamicllay。

simpleJobLauncher.run(itemDataQualtiyReportJob,jobParameters);

@Bean
public JobLauncher jobLauncher() {
SimpleJobLauncher launcher = new SimpleJobLauncher();
launcher.setJobRepository(jobRepository);
launcher.setTaskExecutor(new SimpleAsyncTaskExecutor());
try {
launcher.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return launcher;
}

@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("DQReports");
asyncTaskExecutor.setConcurrencyLimit(noOfThreads);
return asyncTaskExecutor;
}


@Bean
public Step step(){
return stepBuilderFactory.get("step")
.<ItemDTO,ItemResult>chunk(pageSize)
.reader(cimm2ItemReader()).listener(CommonItemReadListener())
.processor(cimm2ItemProcessor()).listener(CommonItemProcessListener())
.writer(cimm2ItemResultWriter())
.faultTolerant()
.skipLimit(skipBadRecordsLimit) //default is set to 0
.skip(FileNotFoundException.class)
.taskExecutor(taskExecutor())
.listener(ItemDataQualityStepListener())
.build();
}

@Bean
public ItemReader<ItemDTO> cimm2ItemReader() {
Cimm2ItemReader itemReader = new Cimm2ItemReader(jdbcTemplate());
itemReader.setDataSource(dataSource);
itemReader.setRowMapper(new Cimm2ItemDTOMapper());
itemReader.setQueryProvider(queryProvider());
itemReader.setPageSize(pageSize);
return itemReader;
}


@Bean
public ItemProcessor<ItemDTO, ItemResult> cimm2ItemProcessor() {
return new Cimm2ItemProcessor(jdbcTemplate());
}

@Bean
public ItemWriter<ItemResult> cimm2ItemResultWriter() {
return new Cimm2ItemResultWriter(jdbcTemplate(),objectMapper());
}


public class Cimm2ItemReader extends JdbcPagingItemReader<ItemDTO>{
private ExecutionContext stepExecutionContext;
private JobParameters jobParameters;
private DataSource dataSource;
private JdbcTemplate jdbcTemplate;
private final Logger log=LoggerFactory.getLogger(this.getClass());

public Cimm2ItemReader(JdbcTemplate jdbcTemplate) {
super();
this.jdbcTemplate = jdbcTemplate;
}

protected void doReadPage() {
log.info("--------------------------------Inside Cimm2ItemReader doReadPage Method");
super.doReadPage();
log.info("--------------------------------Exit Cimm2ItemReader doReadPage Method");
}


@BeforeStep
public void beforeStep(StepExecution stepExecution) {
jobParameters = stepExecution.getJobParameters();
stepExecutionContext = stepExecution.getExecutionContext();

//update paging query provider with sql
SqlPagingQueryProviderFactoryBean factory = new SqlPagingQueryProviderFactoryBean();
factory.setDatabaseType("ORACLE");
factory.setDataSource(dataSource);

String selectClause = getSelectClause();
String fromClause = getFromClause();
String whereClause = getWhereClause();
String sortField = "ITEM_ID";
factory.setSelectClause(selectClause);
factory.setFromClause(fromClause);
factory.setWhereClause(whereClause);
Map<String, Order> sortKeys = new HashMap<String, Order>();
sortKeys.put(sortField, Order.DESCENDING);
factory.setSortKeys(sortKeys);
PagingQueryProvider pqp = null;

try {
pqp = factory.getObject();
super.setQueryProvider(pqp);
super.afterPropertiesSet();
//super.setSaveState(false);
} catch (Exception e) {
log.error("Exception in Cimm Reader Before Step"+e.getMessage());
throw new ItemDataQualityReportException("Unable to prepare the "
+ "Item master sql to read the item data.",e);
}
}
}

//----这是jar文件代码

  @Override
@SuppressWarnings("unchecked")
public void open(ExecutionContext executionContext) {
if (isSaveState()) {
startAfterValues = (Map<String, Object>) executionContext.get(getExecutionContextKey(START_AFTER_VALUE));

if(startAfterValues == null) {
startAfterValues = new LinkedHashMap<>();
}
}

super.open(executionContext);
}

@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
super.open(executionContext);
try {
doOpen();
}
catch (Exception e) {
throw new ItemStreamException("Failed to initialize the reader", e);
}
if (!isSaveState()) {
return;
}

if (executionContext.containsKey(getExecutionContextKey(READ_COUNT_MAX))) {
maxItemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT_MAX));
}

int itemCount = 0;
if (executionContext.containsKey(getExecutionContextKey(READ_COUNT))) {
itemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT));
}
else if(currentItemCount > 0) {
itemCount = currentItemCount;
}

if (itemCount > 0 && itemCount < maxItemCount) {
try {
jumpToItem(itemCount);
}
catch (Exception e) {
throw new ItemStreamException("Could not move to stored position on restart", e);
}
}

currentItemCount = itemCount;

}

@Override
protected void doOpen() throws Exception {

Assert.state(!initialized, "Cannot open an already opened ItemReader, call close first");
initialized = true;

}

提前致谢。

下面是异常(exception)情况。

2018-07-12 19:21:23.015 ERROR 24304 — [cTaskExecutor-2] o.s.batch.core.step.AbstractStep : Encountered an error executing step step in job itemDataQualtiyReport

org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:149)
at org.springframework.batch.item.database.JdbcPagingItemReader.open(JdbcPagingItemReader.java:260)
at org.springframework.batch.item.database.JdbcPagingItemReader$$FastClassBySpringCGLIB$$42c8e250.invoke()
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:747)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at com.unilog.cimm.reports.batch.reader.Cimm2ItemReader$$EnhancerBySpringCGLIB$$6a6d34be.open()
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103)
at org.springframework.batch.core.step.item.ChunkMonitor.open(ChunkMonitor.java:114)
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103)
at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:310)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:197)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:66)
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:136)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:308)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:141)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Cannot open an already opened ItemReader, call close first
at org.springframework.util.Assert.state(Assert.java:73)
at org.springframework.batch.item.database.AbstractPagingItemReader.doOpen(AbstractPagingItemReader.java:133)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:146)
… 23 common frames omitted

最佳答案

问题是由于您的阅读器的并发使用造成的。

The implementation is thread-safe in between calls to open(ExecutionContext), but remember to use saveState=false if used in a multi-threaded client (no restart available).

因此,使用 saveState=false 一切都应该没问题。

关于java - org.springframework.batch.item.ItemStreamException : Failed to initialize the reader in JdbcPagingItemReader,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51308687/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com