gpt4 book ai didi

java - 使用 Spring 集成实现 MongoDB 入站流

转载 作者:行者123 更新时间:2023-11-30 06:04:20 25 4
gpt4 key购买 nike

我们将有一个包含多个工作单元的 Mongo 集合。我的想法是,该文档将有一个带有四个选项的状态字段:未处理、正在处理、完成、失败。 Spring Integration 将配置为从此数据库读取并处理存储在那里的消息。

入站 Mongo DSL 流将根据 UNPROCESSED 值从集合中读取:

MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{'status' : 'UNPROCESSED'}"));
return IntegrationFlows.from(messageSource)...

问题是:如果我有几台工作计算机从同一个数据库读取数据,我想阻止它们对同一行未处理的数据进行操作,因为我的轮询器使用 maxMessagesPerPoll 的保守值或消息处理需要一段时间。

看来正确的地方是使用 TransactionSynchronizationFactory 定义一个 ProcessBeforeCommit 阶段来将状态更新为 PROCESSING,并定义一个 ProcessAfterCommit 阶段来将状态更新为 PROCESSING。将状态更新为“完成”或“失败”。然而,在查看轮询器和事务管理器的 API 时,我并不清楚添加此功能的机制。 XML 中有一些示例,但我没有看到使用 DSL 的示例。

我还想确保 ProcessBeforeCommit 发生在数据库读取时而不是处理后......是吗?另外,如果这不是设计从 Mongo 集合读取的解决方案的最佳方式,请随时建议更好的架构。

最佳答案

不,ProcessBeforeCommitProcessAfterCommit 是非常接近的回调。它们肯定会在您的流程结束时发生。让我们假设您有一个类似的方法:

@Transactional
void foo() {}

当您调用此类方法时,事务在进入方法主体之前开始。当我们在执行后退出方法体时,会执行 beforeCommit 回调。它可能会失败,因为在我们的过程中,外部连接(DB?)可能会丢失。仅当没问题时,我们才会继续进行 afterCommit

您所要求的可以通过 AbstractMessageSourceAdvice 实现来完成:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-channels-section.html#conditional-pollers 。因此,在 afterReceive() 实现中,您可以将文档更新为 PROCESSING,甚至决定返回 null 而不是消息:只是因为它在数据库中的状态已经是PROCESSING。这样的 Advice 可以注入(inject)到 PollerSpec 中:

/**
* Specify AOP {@link Advice}s for the {@code pollingTask}.
* @param advice the {@link Advice}s to use.
* @return the spec.
*/
public PollerSpec advice(Advice... advice) {

DONEFAILED 确实可以通过应用于 PollerSpecTransactionSynchronizationFactoryBean 来实现:

/**
* Specify the {@link TransactionSynchronizationFactory} to attach a
* {@link org.springframework.transaction.support.TransactionSynchronization}
* to the transaction around {@code poll} operation.
* @param transactionSynchronizationFactory the TransactionSynchronizationFactory to use.
* @return the spec.
*/
public PollerSpec transactionSynchronizationFactory(

关于java - 使用 Spring 集成实现 MongoDB 入站流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51676101/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com