gpt4 book ai didi

java - 从 Spring Integration 启动 Spring Batch 作业

转载 作者:行者123 更新时间:2023-11-30 06:29:16 27 4
gpt4 key购买 nike

我需要从远程 SFTP 服务器下载文件并使用 Spring Batch 处理它们。我已经使用 Spring Integration 实现了下载文件的代码。但我无法从 Spring 集成组件启动 Spring Batch 作业。我有以下代码:

    @Autowired
private JobLauncher jobLauncher;

public String OUTPUT_DIR = "temp_dir";

@Value("${sftp.remote.host}")
private String sftpRemoteHost;

@Value("${sftp.remote.user}")
private String sftpUsername;

@Value("${sftp.remote.password}")
private String sftpPassword;

@Value("${sftp.remote.folder}")
private String sftpFolder;

@Bean
public DefaultSftpSessionFactory sftpSessionFactory() {
final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
factory.setHost(sftpRemoteHost);
factory.setAllowUnknownKeys(true);
factory.setUser(sftpUsername);
factory.setPassword(sftpPassword);
return factory;
}

@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
final SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory(sftpFolder);
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.csv"));
return fileSynchronizer;
}

@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
final SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File(OUTPUT_DIR));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<>());
return source;
}

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
final FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
handler.setFileExistsMode(FileExistsMode.REPLACE);
handler.setExpectReply(true);
handler.setOutputChannelName("parse-csv-channel");
return handler;
}

@ServiceActivator(inputChannel = "parse-csv-channel", outputChannel = "job-channel")
public JobLaunchRequest adapt(final File file) throws Exception {
final JobParameters jobParameters = new JobParametersBuilder().addString(
"input.file", file.getAbsolutePath()).toJobParameters();
return new JobLaunchRequest(batchConfiguration.job(), jobParameters);
}

@ServiceActivator(inputChannel = "job-channel", outputChannel = "finish")
public JobLaunchingMessageHandler jobHandler(JobLaunchRequest request) throws JobExecutionException {
return new JobLaunchingMessageHandler(jobLauncher);//.launch(request);
}

@ServiceActivator(inputChannel = "finish")
public void finish() {
System.out.println("FINISH");
}

但这不起作用(最后一个方法 adapt 中出现错误),因为找不到 File 类型的 bean。我无法将这两部分放在一起。如何连线集成和批处理?

最佳答案

您只需从 adapt() 方法中删除 @Bean 注释即可。如果我们真的构建 MessageHandler bean,例如 JobLaunchingMessageHandler 来接受 JobLaunchRequest 负载: https://docs.spring.io/spring-batch/trunk/reference/html/springBatchIntegration.html#launching-batch-jobs-through-messages ,我们需要 @Bean

请参阅引用手册中有关消息传递注释的更多信息:https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/configuration.html#annotations_on_beans

更新

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
final FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
handler.setFileExistsMode(FileExistsMode.REPLACE);
handler.setExpectReply(true);
handler.setOutputChannelName("parse-csv-channel");
return handler;
}

@ServiceActivator(inputChannel = "parse-csv-channel", outputChannel = "job-channel")
public JobLaunchRequest adapt(final File file) throws Exception {
final JobParameters jobParameters = new JobParametersBuilder().addString(
"input.file", file.getAbsolutePath()).toJobParameters();
return new JobLaunchRequest(batchConfiguration.job(), jobParameters);
}

@Bean
@ServiceActivator(inputChannel = "job-channel")
public JobLaunchingGateway jobHandler() {
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
jobLaunchingGateway.setOutputChannelName("finish");
return jobLaunchingGateway;
}

关于java - 从 Spring Integration 启动 Spring Batch 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46469655/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com