gpt4 book ai didi

java - JMS Reader/Processor/Writer 步骤之后的 Spring Batch 步骤未触发

转载 作者:行者123 更新时间:2023-12-01 16:35:12 25 4
gpt4 key购买 nike

我们的 Spring 批处理需要有两个步骤

Step1:Deque 是来自 JMS 的消息,处理该消息并将输出数据写入文件。步骤 2:包含 2 个从属步骤的分区步骤。每个从属步骤对步骤 1 的输出数据应用不同的算法。

问题:第 2 步从未被调用。尝试将步骤监听器附加到步骤 1(以及编写器),但它从未执行。看起来步骤 1 始终处于连续状态,因此步骤 2(分区步骤)永远不会被执行。

观察:当用普通(文件/数据库)读取器替换步骤 1 中的 JMSReader 时,控制转到步骤 2

注意:需要firstQueueTemplate.setReceiveTimeout(Long.MAX_VALUE);因为我们需要连续地将消息逐一出列。

    public TransactionAwareConnectionFactoryProxy activeMQConnectionFactory() {
ActiveMQConnectionFactory amqConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
TransactionAwareConnectionFactoryProxy activeMQConnectionFactory = new TransactionAwareConnectionFactoryProxy(amqConnectionFactory);
return activeMQConnectionFactory;
}

@Bean
public ActiveMQQueue defaultQueue() {
return new ActiveMQQueue("batch-test");
}

@Bean
@DependsOn(value = { "activeMQConnectionFactory", "defaultQueue" })
public JmsTemplate firstQueueTemplate(ActiveMQQueue defaultQueue, TransactionAwareConnectionFactoryProxy activeMQConnectionFactory) {
JmsTemplate firstQueueTemplate = new JmsTemplate(activeMQConnectionFactory);
firstQueueTemplate.setDefaultDestination(defaultQueue);
firstQueueTemplate.setSessionTransacted(true);
firstQueueTemplate.setReceiveTimeout(Long.MAX_VALUE);
return firstQueueTemplate;
}

@Bean(name = "partitionerJob")
public Job partitionerJob() throws UnexpectedInputException, MalformedURLException, ParseException {
return jobs.get("partitionerJob")
.start(ingestionstep())
.next(partitionStep())
.build();
}




@Bean
public Step ingestionstep() throws UnexpectedInputException, MalformedURLException, ParseException {
System.out.println("ingestionstep forming");
return steps.get("ingsetionstep")
.<SPDRIngestScanBO, SPDRIngestScanBO>chunk(1)
.reader(jmsItemReader())
.processor(ingestionProcessor())
.writer(ingestionwriter() )
.listener(new StepExecutionListener() {

@Override
public void beforeStep(StepExecution stepExecution) {
// TODO Auto-generated method stub

}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("step exit status :"+stepExecution.getExitStatus());
return null;
}
})
// .listener(promotionListener())

.build();
}




@Bean
@StepScope
public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException {
System.out.println(" Inside partitionStep method ");
return steps.get("partitionStep")
.partitioner("partitionscans", partitioner(null))
.gridSize(2)
.step(scanStep())
.taskExecutor(taskExecutor())
.build();
}

@Bean
public JmsItemReader<SPDRIngestScanBO> jmsItemReader() {
JmsItemReader<SPDRIngestScanBO> jmsItemReader = new JmsItemReader<>();
jmsItemReader.setJmsTemplate(jmsTemplate);
jmsItemReader.setItemType(SPDRIngestScanBO.class);

return jmsItemReader;
}

@Bean
public SPDRIngestionStepProcessor ingestionProcessor() {
return new SPDRIngestionStepProcessor();
}

@Bean
public SPDRIngestionStepWriter ingestionwriter() {
return new SPDRIngestionStepWriter();
}


@Bean
@StepScope
public ModelsPartitioner partitioner(@Value("#{jobExecutionContext[models]}") List<SPDRScanModelBO> models) {

ModelsPartitioner partitioner = new ModelsPartitioner();
partitioner.setModels(models);
System.out.println("----partitioner----");
return partitioner;
}


@Bean
@StepScope
public Step scanStep() throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("scanstep")
.<SPDRScanModelBO, SPDRScanResultBO>chunk(1)
.reader(scanStepReader(null))
.processor(scanStepProcessor())
.writer(scanStepWriter())
.build();
}


@Bean
@StepScope
public SPDRScanStepReader scanStepReader(@Value("#{stepExecutionContext[model]}") SPDRScanModelBO scanModelBO){
System.out.println("----scanStepReader----");
SPDRScanStepReader scanStepReader = new SPDRScanStepReader();
scanStepReader.setScanModelBO(scanModelBO);
return scanStepReader;

}

@Bean
@StepScope
public SPDRScanStepProcessor scanStepProcessor(){

SPDRScanStepProcessor scanStepProcessor = new SPDRScanStepProcessor();
return scanStepProcessor;

}

@Bean
@StepScope
public SPDRScanStepWriter scanStepWriter(){

SPDRScanStepWriter scanStepWriter = new SPDRScanStepWriter();
return scanStepWriter;

}

@Bean
@StepScope
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(2);
taskExecutor.setCorePoolSize(2);
taskExecutor.setQueueCapacity(2);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
```

最佳答案

在 Spring Batch 中,当您按顺序运行两个步骤时,先说步骤 1,然后说步骤 2,那么只有当步骤 1 完成时才会执行步骤 2。

在您的情况下,step1 使用 receiveTimeout = Long.MAX_VALUE 从 jms 队列中读取数据,因此除非发生此超时,否则您的 step1 将不会完成,因此 step2 将不会启动。

关于java - JMS Reader/Processor/Writer 步骤之后的 Spring Batch 步骤未触发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61968218/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com