gpt4 book ai didi

java - 如何创建幂等的 @RabbitListener

转载 作者:行者123 更新时间:2023-11-30 06:30:12 25 4
gpt4 key购买 nike

我们的配置是:1...n 个具有共享数据库的消息接收器。消息应该只被处理一次。

   @RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "message-queue", durable = "true"),
exchange = @Exchange(value = TOPIC_EXCHANGE, type = "topic", durable = "true"),
key = MESSAGE_QUEUE1_RK)
)
public void receiveMessage(CustomMessage message) throws InterruptedException {
System.out.println("I have been received = " + message);
}

我们希望保证消息将被处理一次,我们有一个消息存储,其中包含已处理的消息的 ID。是否可以在 receiveMessage 之前 Hook 此检查?我们尝试查看带有rabbitTemplate 的MessagePostProcessor,但似乎不起作用。

关于如何做到这一点有什么建议吗?我们尝试使用 MethodInterceptor,这可以工作,但非常丑陋。谢谢

<小时/>

找到解决方案 - 感谢加里我创建了一个实现 SmartLifecycleMessagePostProcessorInjector启动时,我检查每个容器,如果它是 AbstractMessageListenerContainer,则添加客户 MessagePostProccesser和一个自定义的 ErrorHandler ,它查找特定类型的异常并丢弃它们(其他转发到 defaultErrorHandler )由于我们使用 DLQ,我发现抛出异常或设置为 null 并不会真正起作用。

我将发出拉取请求以忽略 MPP 之后的空消息。

最佳答案

有趣; SimpleMessageListenerContainer 确实有一个属性 afterReceivePostProcessors(当前无法通过注释使用的监听器容器工厂获得,但可以稍后注入(inject))。

但是,这些后处理器不会有帮助,因为我们仍然调用监听器。

请随时打开JIRA Improvement Issue有两件事:

  1. 在监听器容器工厂中公开 afterReceivePostProcessors
  2. 如果后处理器返回 null,则跳过调用监听器方法。

(更正,该属性确实是由工厂暴露的)。

编辑

它是如何工作的...

在上下文初始化期间...

  1. 对于 Bean 后处理器检测到的每个注释,都会创建容器并在 RabbitListenerEndpointRegistry 中注册
  2. 在上下文初始化即将结束时,注册表将被 start()ed,并启动为 autoStartup 配置的所有容器(默认)。

要在启动之前对容器进行进一步配置(例如,对于容器工厂当前未公开的属性),请将 autoStartup 设置为 false

然后,您可以从注册表获取容器(作为集合或通过 id)。只需@Autowire应用程序中的注册表即可。

将容器转换为 SimpleMessageListenerContainer(或者,如果使用 Spring AMQP 2.0 或更高版本并且您使用其工厂,则转换为 DirectMessageListenerContainer)。

设置附加属性(例如afterReceiveMessagePostProcessors);然后start()容器。

注意:在我们增强容器以允许返回 null 的 MPP 之前,可能的替代方案是从 MPP 抛出 AmqpRejectAndDontRequeueException。但是,如果您配置了 DLQ,这可能不是您想要的。

关于java - 如何创建幂等的 @RabbitListener,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46314742/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com