gpt4 book ai didi

java - Spring AMQP 多线程事务

转载 作者:行者123 更新时间:2023-11-30 07:08:24 26 4
gpt4 key购买 nike

我正在使用 Spring AMQP(RabbitMQ 实现),并且尝试将单个事务传播到多个线程中。

举个例子,假设有 3 个队列,名称为 X、Y、Z,首先我使用线程 1 从队列 X 获取一条消息,然后将该消息传递给线程 0,并在线程0消息被克隆并通过线程2发送到队列Y,通过线程3发送到队列Z。线程 0 等待线程 3 和线程 4 完成,以提交或回滚消息。请注意,我在这里使用 4 个线程。

我想要的基本上是将这 3 个操作(获取消息并将其放入两个队列)作为单个事务处理。也就是说,如果我成功地将消息发送到队列Y,但未能将其发送到Z,那么发送到Y的消息将被回滚,原始消息也会被回滚到队列X。

到目前为止,我已经成功地通过 threadLocals(主要是 TransactionStatus 和 TransactionSynchronizationManager.resources)传播事务信息,并且能够将这 3 个操作绑定(bind)到一个事务中。

但我的问题是向原始队列 X 发送 ACK/NACK,即使我提交/回滚事务,它也仅适用于队列 Y 和 Z。从X获取的消息始终处于Unacked状态。

我尝试过channel.basicAck()、RabbitUtils.commitIfNecessary()等方法,但没有成功。

请注意,我也启用了channelTransacted。非常感谢任何帮助。

最佳答案

Spring 事务绑定(bind)到单个线程。您也许可以直接使用 RabbitMQ 客户端使其工作 - 但您必须在所有线程之间共享相同的 channel 。然而,RabbitMQ documentation强烈建议不要这样做:

Channel instances must not be shared between threads. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads. While some operations on channels are safe to invoke concurrently, some are not and will result in incorrect frame interleaving on the wire. ...

无论如何,即使您在单线程上完成工作,RabbitMQ 事务也相当弱并且不能保证太多;请参阅broker semantics .

AMQP guarantees atomicity only when transactions involve a single queue, i.e. all the publishes inside the tx get routed to a single queue and all acks relate to messages consumed from the same queue. ...

关于java - Spring AMQP 多线程事务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39654145/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com