gpt4 book ai didi

java - Spring Cloud Stream与RabbitMQ Binder,如何应用@Transactional?

转载 作者:行者123 更新时间:2023-11-30 06:08:22 24 4
gpt4 key购买 nike

我有一个Spring Cloud Stream接收来自 RabbitMQ 的事件的应用程序使用Rabbit Binder 。我的申请可以概括如下:

@Transactional
@StreamListener(MySink.SINK_NAME)
public void processEvents(Flux<Event> events) {
// Transform events and store them in MongoDB using
// spring-boot-data-mongodb-reactive
...
}

问题是,@Transactional 似乎不适用于 Spring Cloud Stream(或者至少这是我的印象),因为如果写入 MongoDB 时出现异常,该事件似乎已经发生了已被确认到 RabbitMQ 并且不会重试该操作。

鉴于我希望实现与使用 spring-amqp 函数周围的 @Transactional 基本相同的功能:

  1. 使用 Spring 时是否必须手动向 RabbitMQ 确认消息Cloud Stream 与 Rabbit Binder?
  2. 如果是这样,我该如何实现这一目标?

最佳答案

这里有几个问题。

  1. 确认消息不需要事务
  2. 基于 Reactor @StreamListener方法只被调用一次,只是为了设置 Flux所以@Transactional在该方法上是没有意义的 - 消息然后流过 Flux,因此与单个消息相关的任何事情都必须在 Flux 的上下文中完成。
  3. Spring Transactions 绑定(bind)到线程 - Reactor 是非阻塞的;该消息将在第一次切换时得到确认。

是的,您需要使用手动确认;大概是 mongodb 存储操作的结果。您可能需要使用 Flux<Message<Event>>这样您就可以访问 channel 和转换标签 header 。

关于java - Spring Cloud Stream与RabbitMQ Binder,如何应用@Transactional?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50793728/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com