gpt4 book ai didi

java - Spring - Rabbitmq Listener需要在数据库更改期间暂停和恢复

转载 作者:行者123 更新时间:2023-12-02 09:26:42 26 4
gpt4 key购买 nike

嘿,要求是在更改后端表期间暂停rabbitmq监听器处理消息。此更改仅限于我的应用程序,因此不想关闭整个rabbitmq 实例。一旦该过程完成,我想再次启动听众。

我面临的问题我有 2 个监听器连接到 2 个共享“consumerconnectionFactory”的独立队列。当我终止连接时,只有没有任何开放 channel 的连接才会被终止,当我恢复连接时,我得到了一个之前不存在的额外连接。 你能帮忙吗?

我在下面分享我的 java 配置。

@Bean
public SimpleMessageListenerContainer auditMessageListenerContainer(AuditMessageListener auditMessageListener)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
container.setQueueNames(messagingAuditQueue);
container.setMessageListener(auditMessageListener);
container.setMaxConcurrentConsumers(5);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setDefaultRequeueRejected(false);
container.setMissingQueuesFatal(false);
container.setForceCloseChannel(true);
container.setExclusive(false);
return container;
}
@Bean
public SimpleMessageListenerContainer accessMessageListenerContainer(AccessLogListener accessLogListener)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
container.setQueueNames(accessAuditQueue);
container.setMessageListener(accessLogListener);
container.setMaxConcurrentConsumers(5);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setDefaultRequeueRejected(false);
container.setMissingQueuesFatal(false);
container.setForceCloseChannel(true);
container.setExclusive(false);
return container;
}

这就是我为监听器进行 Java 配置的方式。

下面是用于启动和停止监听器的 RestController

@RestController
@RequestMapping(MESSAGE_AUDIT_ROOT)
public class RestartController {
@Autowired
private List<MessageListenerContainer> listenerContainers;

@Autowired
private List<ConnectionFactory> connectionFactories;

@GetMapping("/stop")
public String stopMessageListenerContainer() {
connectionFactories.forEach(conFactory -> {
CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
cConFactory.resetConnection();
});
listenerContainers.forEach(container -> {
SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
smlc.shutdown();
});
listenerContainers.forEach(container -> System.out
.println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
return "done - stop";
}

@GetMapping("/start")
public String startMessageListenerContainer() {
connectionFactories.forEach(conFactory -> {
CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
cConFactory.createConnection();
});
listenerContainers.forEach(container -> {
SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
smlc.start();
});
listenerContainers.forEach(container -> System.out
.println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
return "done - start";
}

}

下面是我在本地看到的行为的图像。1. 初始连接列表 Initial connections list

  • 当连接停止时 休息调用 enter image description here
  • 2.1 队列连接仍处于 Activity 状态 enter image description here3. 连接开始时 Rest Call enter image description here

    最佳答案

    使用默认缓存模式 (CHANNEL),任何时候都应该只有一个连接,除非您将 RabbitTemplate 配置为 usePublisherConnection 设置为 true,在这种情况下,连接名称将为 api-audit.publisher

    由于您有两个名为 api-audit 的连接,因此发生了一些非常奇怪的事情。我怀疑您以某种方式加载了两个连接工厂,也许其中一个位于子应用程序上下文中?在一个应用程序上下文中不能有两个同名的 Bean。

    即您正在对其中一个调用 resetConnection,但不在另一个上调用。

    我建议您在 createConnection 中放置一个断点来查看谁在使用第二个 CF。

    顺便说一下,你应该在容器停止后重置连接;否则容器将进入恢复模式并可能重新打开连接,具体取决于时间。

    关于java - Spring - Rabbitmq Listener需要在数据库更改期间暂停和恢复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58294560/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com