gpt4 book ai didi

java - 如何在 Spring Batch 中组合多个监听器(步骤、读取、处理、写入和跳过)

转载 作者:行者123 更新时间:2023-12-02 01:25:48 34 4
gpt4 key购买 nike

此操作的目的是通过多个步骤跟踪在 Spring Batch 作业中正在读取/处理/写入的行或项目。

我创建了一个实现这些接口(interface)的监听器:StepExecutionListener, SkipPolicy, ItemReadListener, ItemProcessListener, ItemWriteListener

@Component
public class GenericListener implements StepExecutionListener, SkipPolicy, ItemReadListener, ItemProcessListener, ItemWriteListener {
private Log logger = LogFactory.getLog(getClass());
private JobExecution jobExecution;
private int numeroProcess = 0;
private int currentReadIndex = 0;
private int currentProcessIndex = 0;
private int currentWriteIndex = 0;

@Override
public void beforeRead() throws Exception {
log.info(String.format("[read][line : %s]", currentReadIndex));
currentReadIndex++;
}
@Override
public void afterRead (Object o) throws Exception {
log.info("Ligne correct");
}
@Override
public void onReadError (Exception e) throws Exception {
jobExecution.stop();
}
@Override
public boolean shouldSkip (Throwable throwable, int i) throws SkipLimitExceededException {
String err = String.format("Erreur a la ligne %s | message %s | cause %s | stacktrace %s", numeroProcess, throwable.getMessage(), throwable.getCause().getMessage(), throwable.getCause().getStackTrace());
log.error(err);
return true;
}
@Override
public void beforeProcess (Object o) {
log.debug(String .format("[process:%s][%s][Object:%s]", numeroProcess++, o.getClass(), o.toString()));
currentProcessIndex++;
}
@Override
public void afterProcess (Object o, Object o2) { }
@Override
public void onProcessError (Object o, Exception e) {
String err = String.format("[ProcessError at %s][Object %s][Exception %s][Trace %s]", currentProcessIndex, o.toString(), e.getMessage(), e.getStackTrace());
log.error(err);
jobExecution.stop();
}
@Override
public void beforeWrite (List list) {
log.info(String .format("[write][chunk number:%s][current chunk size %s]", currentWriteIndex, list != null ? list.size() : 0));
currentWriteIndex++;
}
@Override
public void afterWrite (List list) { }
@Override
public void onWriteError (Exception e, List list) {
jobExecution.stop();
}
@Override
public void beforeStep(StepExecution stepExecution) {
jobExecution = stepExecution.getJobExecution();
currentReadIndex = 0;
currentProcessIndex = 0;
currentWriteIndex = 0;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
}

作业定义(CustomJobListener是一个扩展JobExecutionListenerSupport的简单类)

public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobs;

@Bean
public Job job(CustomJobListener listener,
@Qualifier("step1") Step step1,
@Qualifier("step2") Step step2,
@Qualifier("step3") Step step3) {
return jobs.get("SimpleJobName")
.incrementer(new RunIdIncrementer())
.preventRestart()
.listener(listener)
.start(step1)
.next(step2)
.next(step3)
.build();
}
}

步骤定义(所有三个步骤具有相同的定义,仅读取/处理器/写入器发生变化)

@Component
public class StepControleFormat {
@Autowired
private StepOneReader reader;
@Autowired
private StepOneProcessor processor;
@Autowired
private StepOneWriter writer;
@Autowired
private ConfigAccess configAccess;
@Autowired
private GenericListener listener;
@Autowired
public StepBuilderFactory stepBuilderFactory;

@Bean
@JobScope
@Qualifier("step1")
public Step stepOne() throws StepException {
return stepBuilderFactory.get("step1")
.<StepOneInput, StepOneOutput>chunk(configAccess.getChunkSize())
.listener((ItemProcessListener<? super StepOneInput, ? super StepOneOutput>) listener)
.faultTolerant()
.skipPolicy(listener)
.reader(reader.read())
.processor(processor.compose())
.writer(writer)
.build();
}
}

现在的问题是方法beforeStep(StepExecution stepExecution)afterStep(StepExecution stepExecution)不会被触发,但是 GenericListener 中的所有其他方法当各自的事件发生时,它们会被正确触发。

我尝试使用listener((StepExecutionListener)listener)而不是listener((ItemProcessListener<? super StepOneInput, ? super StepOneOutput>) listener)但后者返回 AbstractTaskletStepBuiler然后我不能使用reader , processorwriter .

更新:我的 Spring Boot 版本是:v1.5.9.RELEASE

最佳答案

感谢 Michael Minella 的提示,我解决了这个问题:

@Bean
@JobScope
@Qualifier("step1")
public Step stepOne() throws StepException {
SimpleStepBuilder<StepOneInput, StepOneOutput> builder = stepBuilderFactory.get("step1")
.<StepOneInput, StepOneOutput>chunk(configAccess.getChunkSize())
// setting up listener for Read/Process/Write
.listener((ItemProcessListener<? super StepOneInput, ? super StepOneOutput>) listener)
.faultTolerant()
// setting up listener for skipPolicy
.skipPolicy(listener)
.reader(reader.read())
.processor(processor.compose())
.writer(writer);

// for step execution listener
builder.listener((StepExecutionListener)listener);

return builder.build();
}

最后一个电话 listener方法public B listener(StepExecutionListener listener)来自StepBuilderHelper<B extends StepBuilderHelper<B>>返回 StepBuilderHelper不包含 build() 的定义方法。因此,解决方案是拆分步骤构建定义。

我不明白的是:虽然 writer方法返回 SimpleStepBuilder<I, O>其中包含此方法的定义 public SimpleStepBuilder listener(Object listener) ,编译器/IDE (IntelliJ IDEA) 正在调用 public B listener(StepExecutionListener listener)来自StepBuilderHelper<B extends StepBuilderHelper<B>> 。如果有人可以帮助解释这种行为。

此外,找到一种使用 public SimpleStepBuilder listener(Object listener) 在一次调用中连接所有监听器的方法来自SimpleStepBuilder会很有趣。

关于java - 如何在 Spring Batch 中组合多个监听器(步骤、读取、处理、写入和跳过),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56974573/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com