gpt4 book ai didi

java - FtpStreamingMessageSource - 失败时重试

转载 作者:行者123 更新时间:2023-11-30 06:39:09 25 4
gpt4 key购买 nike

我已经配置了 spring 与此 bean 的集成:

private static final Pattern FILE_PATTERN = Pattern.compile("<pattern>");

@Bean
public SessionFactory<FTPFile> ftpSessionFactory(){
DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
factory.setHost("localhost");
factory.setPort(21);
factory.setUsername("root");
factory.setPassword("123456");
factory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
return new CachingSessionFactory<>(factory);
}

@Bean
public ConcurrentMetadataStore metadataStore(){
PropertiesPersistingMetadataStore store = new PropertiesPersistingMetadataStore();
store.setFileName("ftpStore.properties");
return store;
}

@Bean(destroyMethod = "close")
public DataSource selectDataSource(){
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl("jdbc:mysql://10.10.10.10:33306/csv");
dataSource.setUsername("root");
dataSource.setPassword("123456");
return dataSource;
}

@Bean
public PlatformTransactionManager transactionManager(){
return new DataSourceTransactionManager(selectDataSource());
}

@Bean
public TransactionSynchronizationFactory synchronizationFactory(){
return new DefaultTransactionSynchronizationFactory(new TransactionSynchronizationProcessor() {
@Override
public void processBeforeCommit(IntegrationResourceHolder integrationResourceHolder) {
int x = 22; //???
}

@Override
public void processAfterCommit(IntegrationResourceHolder integrationResourceHolder) {
int x = 22; //???
}

@Override
public void processAfterRollback(IntegrationResourceHolder integrationResourceHolder) {
int x = 22; //???
}
});
}

@Bean
public PollerMetadata pollerMetadata(PlatformTransactionManager transactionManager){
PeriodicTrigger trigger = new PeriodicTrigger(5000);
trigger.setFixedRate(true);

MatchAlwaysTransactionAttributeSource source = new MatchAlwaysTransactionAttributeSource();
source.setTransactionAttribute(new DefaultTransactionAttribute());
TransactionInterceptor interceptor = new TransactionInterceptor(transactionManager, source);

PollerMetadata metadata = new PollerMetadata();
metadata.setTrigger(trigger);
metadata.setTransactionSynchronizationFactory(synchronizationFactory());
metadata.setAdviceChain(Collections.singletonList(interceptor));
return metadata;
}

@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller("pollerMetadata"))
public MessageSource<InputStream> ftpMessageSource(){
FtpStreamingMessageSource source = new FtpStreamingMessageSource(new FtpRemoteFileTemplate(ftpSessionFactory()));
source.setRemoteDirectory("ftp/folder");
source.setFilter(new CompositeFileListFilter<>(Arrays.asList(
new FtpRegexPatternFileListFilter(FILE_PATTERN),
acceptOnceFileListFilter()
)));
return source;
}

@Bean
public FtpPersistentAcceptOnceFileListFilter acceptOnceFileListFilter(){
FtpPersistentAcceptOnceFileListFilter filter = new FtpPersistentAcceptOnceFileListFilter(metadataStore(), "remote");
filter.setFlushOnUpdate(true);
return filter;
}

@Bean
@ServiceActivator(inputChannel = "newChannel")
public MessageHandler handler(){
return new MessageHandler(){
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
throw new MessagingException("error");
}
};
}

@Bean
public MessageChannel ftpChannel(){
return new DirectChannel();
}

@Bean
public MessageChannel newChannel(){
return new DirectChannel();
}

@Bean
public MessageChannel strChannel(){
return new DirectChannel();
}

@Bean
@Transformer(inputChannel = "ftpChannel", outputChannel = "strChannel")
public org.springframework.integration.transformer.Transformer transformer2(){
return new StreamTransformer("UTF-8");
}

@Bean
@Transformer(inputChannel = "strChannel", outputChannel = "newChannel")
public UnmarshallingTransformer transformer(){
UnmarshallingTransformer transformer = new UnmarshallingTransformer(unmarshaller());
return transformer;
}

@Bean
public Jaxb2Marshaller unmarshaller(){
Jaxb2Marshaller unmarshaller = new Jaxb2Marshaller();
unmarshaller.setContextPath("com.generated.xsd");
return unmarshaller;
}

我的问题是,在抛出 new MessagingException("error"); 时,所有 ftp 文件都会保存到 ftpStore.properties 并在下次重新加载时(例如,如果 JVM失败),该文件将不再被处理。我如何确保事务已就位(也就是说,如果没有异常文件保存到 ftpStore.properties ,否则没有)?是否有一些教程可以遵循,以便从 FTP 服务器下载文件时不会出现故障?

最佳答案

有一个ResettableFileListFilter抽象来处理这个问题。

事实上,您的 FtpPersistentAcceptOnceFileListFilter 就是这样一个:

If, after synchronizing the files, an error occurs on the downstream flow processing a file, there is no automatic rollback of the filter so the failed file will not be reprocessed by default.

If you wish to reprocess such files after a failure, you can use configuration similar to the following to facilitate the removal of the failed file from the filter. This will work for any ResettableFileListFilter.

XML 配置示例如下:

<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="@acceptOnceFilter.remove(payload)" />
</int:transaction-synchronization-factory>

因此,您需要分别使用适当的 DefaultTransactionSynchronizationFactoryExpressionEvaluatingTransactionSynchronizationProcessor 调整您的 synchronizationFactory

参见Recovering from Failures .

关于java - FtpStreamingMessageSource - 失败时重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44700241/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com