gpt4 book ai didi

Java Config 入站文件适配器事务管理

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:57:05 27 4
gpt4 key购买 nike

我有一个简单的 IntegrationFlow,我正试图通过事务管理来保护它。从高层次上看,它会轮询目录中的文件,将该文件 gzip 到磁盘上的新位置,然后将第二个文件上传到 S3。如果提交,我想将两条路径发布到一个 channel (它们将被删除);如果它回滚,我想将它们发布到不同的 channel (它们将被存档)。

我的(缩写的)尝试配置如下所示:

@Bean
public IntegrationFlow fromFileFlow(@Qualifier("inboundMessageDirectory") Path inboundMessageDirectory,
@Qualifier("consumableFileFilter") FileListFilter<File> fileListFilter,
@Qualifier(ChannelNames.TO_GZIP_SERVICE) MessageChannel outbound,
@Qualifier("transactionSynchronizationFactory") TransactionSynchronizationFactory transactionSynchronizationFactory) {
return IntegrationFlows
.from(s -> s.file(inboundMessageDirectory.toFile()),
e -> e.poller(Pollers
.fixedDelay(100)
.transactionSynchronizationFactory(transactionSynchronizationFactory)
.transactional(new PseudoTransactionManager())
.get()))
.channel(outbound)
.get();
}

服务看起来像:

@ServiceActivator(inputChannel = ChannelNames.TO_GZIP_SERVICE, outputChannel = ChannelNames.TO_S3_SERVICE)
public Path gzip(@Payload Path path, @Header(ApplicationHeaders.REFERENCE_ID) String referenceId, @Header(ApplicationHeaders.DATA_TYPE) String dataType,
@Header(ApplicationHeaders.BUCKET_END) long bucketEnd) throws IOException {
// ...
}

@ServiceActivator(inputChannel = ChannelNames.TO_S3_SERVICE)
@Retryable(interceptor = RETRY_INTERCEPTOR_BEAN_NAME)
public void sendToS3(@Payload Path path, @Header(ApplicationHeaders.BUCKET_END) long bucketStart,
@Header(ApplicationHeaders.DATA_TYPE) String dataType) throws IOException {
// ...
}

我的自定义 TransactionSynchronizationProcessor(我使用 DefaultTransactionSynchronizationFactory)大致如下实现(未显示的代码从消息有效负载中提取路径并将其存储在 IntegrationResourceHolder 的属性中):

@Override
public void processBeforeCommit(IntegrationResourceHolder holder) {
updatePaths(holder);
}

@Override
public void processAfterCommit(IntegrationResourceHolder holder) {
updateAndSend(successChannel, holder);
}

@Override
public void processAfterRollback(IntegrationResourceHolder holder) {
updateAndSend(failureChannel, holder);
}

我的理解是,由于所有插页式 channel 都是直接 channel ,因此交易也应该包含服务。因为它会在提交之前、提交之后和回滚之后进行更新,所以我希望它在开始时访问消息,获取解压后的路径,然后在最后访问它并获得压缩后的路径——然后尝试采取行动他们。但是,只会提取解压后的路径。

显然,我对交易的应用(和理解)缺少一些东西。实现所需行为的正确方法是什么?

最佳答案

不对,你有点误会了。

有了 transactionaltransactionSynchronizationFactory,我们只需要担心 source。作为数据的最终结果对于事务资源并不重要。我们只需要知道提交或回滚交易的状态。

这就是为什么 IntegrationResourceHolder 有一个 message 属性来携带 TX 末尾要担心的源数据。

比如当我们回滚时尝试在 TX 的末尾成像一个数据库事务和数据。对,没有那个数据!只有TX的输入是重要的。

但是,您可以使用额外的 ResourceHolderResourceHolderSynchronization 来满足您的要求,以便在 S3 存储之前的某个地方使用。简单的 ThreadLocal 也可能有所帮助。但一定要在TX结束时正确清除。

从另一端考虑使用 s.file 中消息的 headers 携带您需要的一切。为此,AbstractMessageSourceAdvice 将很有用,因为 Spring Integration 4.2 .

关于Java Config 入站文件适配器事务管理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33417559/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com