gpt4 book ai didi

通过Nacos配置刷新进行RabbitMQ消费者在线启停

转载 作者:我是一只小鸟 更新时间:2023-02-12 22:31:12 35 4
gpt4 key购买 nike

前提

公司在做一些金融相关业务,某些时候由于数据提供商定期维护或者特殊原因需要暂停某些服务的消费者。之前选用的消息队列技术栈是 RabbitMQ ,用于微服务之间的消息投递,对于这类需要暂停消费者的场景是选用注释掉消费者 Bean 中的相应 Spring(Boot) 注解重新发布来实现,后面需要重新启动消费就是解开对应的注释再发布一次。这样的处理流程既繁琐,也显得没有技术含量,所以笔者就这个问题结合已有的配置中心 Nacos 集群做了一个方案,使用 Nacos 的配置准实时刷新功能去控制某个微服务实例的所有 RabbitMQ 消费者(容器)的停止和启动.

spring-boot-rabbit-nacos-control-1

方案原理

下面探讨一下方案的原理和可行性,主要包括:

  • RabbitMQ消费者生命周期管理
  • Nacos长轮询与配置刷新

因为工作中的主要技术栈是 SpringBoot + RabbitMQ ,下文是探讨场景针对 spring-boot-starter-amqp (下面简称 amqp )展开.

使用SpringBoot版本为2.3.0.RELEASE,spring-cloud-alibaba-nacos-config的版本为2.2.0.RELEASE 。

RabbitMQ消费者生命周期管理

查看 RabbitAnnotationDrivenConfiguration 的源码:

spring-boot-rabbit-nacos-control-2

amqp 中默认启用 spring.rabbitmq.listener.type=simple ,使用的 RabbitListenerContainerFactory (消息监听器容器工厂)实现为 SimpleRabbitListenerContainerFactory ,使用的 MessageListenerContainer (消息监听器容器)实现为 SimpleMessageListenerContainer 。在 amqp 中,无论注解声明式或者编程式注册的消费者最终都会封装为 MessageListenerContainer 实例,因此消费者生命周期可以直接通过 MessageListenerContainer 进行管理, MessageListenerContainer 的生命周期管理 API 会直接作用于最底层的真实消费者实现 BlockingQueueConsumer 。几者的关系如下:

spring-boot-rabbit-nacos-control-3

一般声明式消费者注册方式如下:

                        
                          @Slf4j
@RabbitListener(id = "SingleAnnoMethodDemoConsumer", queues = "srd->srd.demo")
@Component
public class SingleAnnoMethodDemoConsumer {

    @RabbitHandler
    public void onMessage(Message message) {
        log.info("SingleAnnoMethodDemoConsumer.onMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
    }
}

@RabbitListener(id = "MultiAnnoMethodDemoConsumer", queues = "srd->srd.demo")
@Component
@Slf4j
public class MultiAnnoMethodDemoConsumer {

    @RabbitHandler
    public void firstOnMessage(Message message) {
        log.info("MultiAnnoMethodDemoConsumer.firstOnMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
    }

    @RabbitHandler
    public void secondOnMessage(Message message) {
        log.info("MultiAnnoMethodDemoConsumer.secondOnMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
    }
}

@Component
@Slf4j
public class MultiAnnoInstanceDemoConsumer {

    @RabbitListener(id = "MultiAnnoInstanceDemoConsumer-firstOnInstanceMessage", queues = "srd->srd.demo")
    public void firstOnInstanceMessage(Message message) {
        log.info("MultiAnnoInstanceDemoConsumer.firstOnInstanceMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
    }

    @RabbitListener(id = "MultiAnnoInstanceDemoConsumer-secondOnInstanceMessage", queues = "srd->srd.sec")
    public void secondOnInstanceMessage(Message message) {
        log.info("MultiAnnoInstanceDemoConsumer.secondOnInstanceMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
    }
}

                        
                      

对于基于 @RabbitListener 进行声明式注册的消费者,每个被 @RabbitListener 修饰的 Bean 或者方法最终都会单独生成一个 SimpleMessageListenerContainer 实例,这些 SimpleMessageListenerContainer 实例的唯一标识由 @RabbitListener 的 id 属性指定,缺省值为 org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#N ,建议在使用时候通过规范约束必须定义此 id 属性。分析源码可以得知这类型的消费者通过 RabbitListenerAnnotationBeanPostProcessor 进行发现和自动注册,并且在 RabbitListenerEndpointRegistry 缓存了注册信息,因此可以通过 RabbitListenerEndpointRegistry 直接获取这些声明式的消费者容器实例:

                        
                          RabbitListenerEndpointRegistry endpointRegistry = configurableListableBeanFactory.getBean(
                RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
                RabbitListenerEndpointRegistry.class);
Set<String> listenerContainerIds = endpointRegistry.getListenerContainerIds();
for (String containerId : listenerContainerIds) {
    MessageListenerContainer messageListenerContainer = endpointRegistry.getListenerContainer(containerId);
    // do something with messageListenerContainer
}

                        
                      

一般编程式消费者注册方式如下:

                        
                          // MessageListenerDemoConsumer
@Component
@Slf4j
public class MessageListenerDemoConsumer implements MessageListener {

    @Override
    public void onMessage(Message message) {
        log.info("MessageListenerDemoConsumer.onMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
    }
}

// CustomMethodDemoConsumer
@Component
@Slf4j
public class CustomMethodDemoConsumer {

    public void customOnMessage(Message message) {
        log.info("CustomMethodDemoConsumer.customOnMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
    }
}

// configuration class
// 通过现存的MessageListener实例进行消费
@Bean
public SimpleMessageListenerContainer messageListenerDemoConsumerContainer(
        ConnectionFactory connectionFactory,
        @Qualifier("messageListenerDemoConsumer") MessageListener messageListener) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setListenerId("MessageListenerDemoConsumer");
    container.setConnectionFactory(connectionFactory);
    container.setConcurrentConsumers(1);
    container.setMaxConcurrentConsumers(1);
    container.setQueueNames("srd->srd.demo");
    container.setAcknowledgeMode(AcknowledgeMode.AUTO);
    container.setPrefetchCount(10);
    container.setAutoStartup(true);
    container.setMessageListener(messageListener);
    return container;
}

// 通过IOC容器中某个Bean的具体方法进行消费
@Bean
public SimpleMessageListenerContainer customMethodDemoConsumerContainer(
        ConnectionFactory connectionFactory,
        CustomMethodDemoConsumer customMethodDemoConsumer) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setListenerId("CustomMethodDemoConsumer");
    container.setConnectionFactory(connectionFactory);
    container.setConcurrentConsumers(1);
    container.setMaxConcurrentConsumers(1);
    container.setQueueNames("srd->srd.demo");
    container.setAcknowledgeMode(AcknowledgeMode.AUTO);
    container.setPrefetchCount(10);
    container.setAutoStartup(true);
    MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
    messageListenerAdapter.setDelegate(customMethodDemoConsumer);
    messageListenerAdapter.setDefaultListenerMethod("customOnMessage");
    container.setMessageListener(messageListenerAdapter);
    return container;
}

                        
                      

编程式注册的 SimpleMessageListenerContainer 可以直接从 IOC 容器中获取:

                        
                          Map<String, MessageListenerContainer> messageListenerContainerBeans
        = configurableListableBeanFactory.getBeansOfType(MessageListenerContainer.class);
if (!CollectionUtils.isEmpty(messageListenerContainerBeans)) {
    messageListenerContainerBeans.forEach((beanId, messageListenerContainer) -> {
        // do something with messageListenerContainer
    });
}

                        
                      

至此,我们知道可以比较轻松地拿到服务中所有的 MessageListenerContainer 的实例,从而可以管理服务内所有消费者的生命周期.

Nacos长轮询与配置刷新

Nacos 的客户端通过 LongPolling (长轮询)的方式监听 Nacos 服务端集群对应 dataId 和 group 的配置数据变更,具体可以参考 ClientWorker 的源码实现,实现的过程大致如下:

spring-boot-rabbit-nacos-control-4

在非 Spring(Boot) 体系中,可以通过 ConfigService#addListener() 进行配置变更监听,示例代码如下:

                        
                          Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
properties.put(PropertyKeyConst.NAMESPACE, "LOCAL");
ConfigService configService = NacosFactory.createConfigService(properties);
Executor executor = Executors.newSingleThreadExecutor(runnable -> {
    Thread thread = new Thread(runnable);
    thread.setDaemon(true);
    thread.setName("NacosConfigSyncWorker");
    return thread;
});
configService.addListener("application-aplha.properties", "customer-service", new Listener() {
    @Override
    public Executor getExecutor() {
        return executor;
    }

    @Override
    public void receiveConfigInfo(String configInfo) {
            // do something with 'configInfo'
    }
});

                        
                      

这种 LongPolling 的方式目前来看可靠性是比较高,因为 Nacos 服务端集群一般在生产部署是大于 3 的奇数个实例节点,并且底层基于 raft 共识算法实现集群通讯,只要不是同一时间超过半数节点宕机集群还是能正常提供服务。但是从实现上来看会有一些局限性:

  • 如果注册过多的配置变更监听器有可能会对 Nacos 服务端造成比较大的压力,毕竟是多个客户端进行轮询
  • 配置变更是由 Nacos 客户端向 Nacos 服务端发起请求,因此监听器回调有可能不是实时的(有可能延迟到客户端下一轮的 LongPolling 提交)
  • Nacos 客户端会缓存每次从 Nacos 服务端拉取的配置内容,如果要变更配置文件过大有可能导致缓存的数据占用大量内存,影响客户端所在服务的性能

关于配置变更监听其实有其他候选的方案,例如Redis的发布订阅,Zookeeper的节点路径变更监听甚至是使用消息队列进行通知,本文使用Nacos配置变更监听的原因是更好的划分不同应用配置文件的编辑查看权限方便进行管理,其他候选方案要实现分权限管理需要二次开发 。

使用 SpringCloudAlibaba 提供的 spring-cloud-alibaba-nacos-config 可以更加简便地使用 Nacos 配置刷新监听,并且会把变更的 PropertySource 重新绑定到对应的配置属性 Bean 。引入依赖:

                        
                          <dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba.nacos</groupId>
    <artifactId>nacos-client</artifactId>
</dependency>

                        
                      

具体的配置类是 NacosConfigProperties :

spring-boot-rabbit-nacos-control-5

红圈中是需要关注的配置项, refreshEnabled 是配置刷新的开关,默认是开启的。 sharedConfigs 和 extensionConfigs 虽然命名不同,但是两者实现和功能没有差异,都是类似于共享或者说扩展配置,每个共享(扩展)配置支持单独配置刷新开关。举个例子,在 Nacos 服务端的某个配置如下图:

spring-boot-rabbit-nacos-control-6

为了支持配置变更和对应的实体类成员变量更新,对应客户端的配置文件是这样的:

                        
                          spring.cloud.nacos.config.refresh-enabled=true
spring.cloud.nacos.config.shared-configs[0].data-id=shared.properties
spring.cloud.nacos.config.shared-configs[0].group=shared-conf
spring.cloud.nacos.config.shared-configs[0].refresh=true

                        
                      

对应的配置属性 Bean 如下:

                        
                          @Data
@ConfigurationProperties(prefix = "shared")
public class SharedProperties {

    private String foo; 
}

                        
                      

只要客户端所在 SpringBoot 服务启动完成后,修改 Nacos 服务端对应 dataId 为 shared.properties 的 shared.foo 属性值,那边 SharedProperties 的 foo 属性就会准实时刷新。可以在 SharedProperties 添加一个 @PostConstruct 来观察这个属性更新的过程:

                        
                          @Slf4j
@Data
@ConfigurationProperties(prefix = "shared")
public class SharedProperties {

    private final AtomicBoolean firstInit = new AtomicBoolean();

    private String foo;

    @PostConstruct
    public void postConstruct() {
        if (!firstInit.compareAndSet(false, true)) {
            log.info("SharedProperties refresh...");
        } else {
            log.info("SharedProperties first init...");
        }
    }
}

                        
                      

方案实施

整个方案实施包括下面几步:

  • 配置变更通知与配置类刷新
  • 发现所有消费者容器
  • 管理消费者容器生命周期

初始化一个 Maven 项目,引入下面的依赖:

  • org.projectlombok:lombok:1.18.12
  • org.springframework.boot:spring-boot-starter-web:2.3.0.RELEASE
  • org.springframework.boot:spring-boot-starter-amqp:2.3.0.RELEASE
  • com.alibaba.cloud:spring-cloud-alibaba-nacos-config:2.2.0.RELEASE
  • com.alibaba.nacos:nacos-client:1.4.4

下载 Nacos 服务并且启动一个单机实例(当前 2023-02 的最新稳定版为 2.2.0 ),新建命名空间 LOCAL 并且添加四份配置文件:

spring-boot-rabbit-nacos-control-7

可以使用1.x的Nacos客户端去连接2.x的Nacos服务端,这个是Nacos做的向下兼容,反过来不行 。

前文提到的 Nacos 客户端中, ConfigService 是通过 dataId 和 group 定位到具体的配置文件,一般 dataId 按照配置文件的内容命名,对于 SpringBoot 的应用配置文件一般命名为 application-${profile}.[properties,yml] , group 是配置文件的分组,对于 SpringBoot 的应用配置文件一般命名为 ${spring.application.name} 。笔者在在这份 SpringBoot 的应用配置文件中只添加了 RabbitMQ 的配置:

spring-boot-rabbit-nacos-control-8

确保本地或者远程有一个可用的 RabbitMQ 服务,接下来往下开始实施方案.

配置变更通知与配置类刷新

前面已经提到过 SpringBoot 结合 Nacos 进行配置属性 Bean 的成员变量刷新,在项目的 Classpath ( resources 文件夹)添加 bootstrap.properties 文件,内容如下:

                        
                          spring.application.name=rabbitmq-rocketmq-demo
spring.profiles.active=default
# nacos配置
spring.cloud.nacos.config.enabled=true
spring.cloud.nacos.config.server-addr=127.0.0.1:8848
spring.cloud.nacos.config.namespace=LOCAL
spring.cloud.nacos.config.group=rabbitmq-rocketmq-demo
spring.cloud.nacos.config.prefix=application
spring.cloud.nacos.config.file-extension=properties
spring.cloud.nacos.config.refresh-enabled=true
spring.cloud.nacos.config.shared-configs[0].data-id=shared.properties
spring.cloud.nacos.config.shared-configs[0].group=shared-conf
spring.cloud.nacos.config.shared-configs[0].refresh=true
spring.cloud.nacos.config.extension-configs[0].data-id=extension.properties
spring.cloud.nacos.config.extension-configs[0].group=extension-conf
spring.cloud.nacos.config.extension-configs[0].refresh=true
spring.cloud.nacos.config.extension-configs[1].data-id=rabbitmq-toggle.properties
spring.cloud.nacos.config.extension-configs[1].group=rabbitmq-rocketmq-demo
spring.cloud.nacos.config.extension-configs[1].refresh=true

                        
                      

这里 profile 定义为 default 也就是会关联到 Nacos 中 dataId = 'application.properties', group = 'rabbitmq-rocketmq-demo' 那份配置文件,主要是用于定义 amqp 需要的配置属性。对于 RabbitMQ 消费者的开关,定义在 dataId = 'rabbitmq-toggle.properties', group = 'rabbitmq-rocketmq-demo' 的文件中。添加 RabbitmqToggleProperties :

                        
                          // RabbitmqToggleProperties
@Slf4j
@Data
@ConfigurationProperties(prefix = "rabbitmq.toggle")
public class RabbitmqToggleProperties {

    private final AtomicBoolean firstInit = new AtomicBoolean();

    private List<RabbitmqConsumer> consumers;

    @PostConstruct
    public void postConstruct() {
        if (!firstInit.compareAndSet(false, true)) {
            StaticEventPublisher.publishEvent(new RabbitmqToggleRefreshEvent(this));
            log.info("RabbitmqToggleProperties refresh, publish RabbitmqToggleRefreshEvent...");
        } else {
            log.info("RabbitmqToggleProperties first init...");
        }
    }

    @Data
    public static class RabbitmqConsumer {

        private String listenerId;

        private Integer concurrentConsumers;

        private Integer maxConcurrentConsumers;

        private Boolean enable;
    }
}

// RabbitmqToggleRefreshEvent
@Getter
public class RabbitmqToggleRefreshEvent extends ApplicationEvent {

    private final RabbitmqToggleProperties rabbitmqToggleProperties;

    public RabbitmqToggleRefreshEvent(RabbitmqToggleProperties rabbitmqToggleProperties) {
        super("RabbitmqToggleRefreshEvent");
        this.rabbitmqToggleProperties = rabbitmqToggleProperties;
    }
}

// StaticEventPublisher
public class StaticEventPublisher {

    private static ApplicationEventPublisher PUBLISHER = null;

    public static void publishEvent(ApplicationEvent applicationEvent) {
        if (Objects.nonNull(PUBLISHER)) {
            PUBLISHER.publishEvent(applicationEvent);
        }
    }

    public static void attachApplicationEventPublisher(ApplicationEventPublisher publisher) {
        PUBLISHER = publisher;
    }
}

                        
                      

这里 prefix 定义为 rabbitmq.toggle ,为了和 rabbitmq-toggle.properties 的属性一一绑定,该文件中的配置 Key 必须以 rabbitmq.toggle 为前缀。 RabbitmqToggleProperties 首次回调 @PostConstruct 方法只打印初始化日志,再次回调 @PostConstruct 方法则发布 RabbitmqToggleRefreshEvent 事件,用于后面通知对应的消费者容器 Bean 进行启停.

发现所有消费者容器

为了统一管理服务中所有消费者容器 Bean ,需要定义一个类似于消费者容器注册或者缓存中心类,缓存 Key 可以考虑使用 listenerId , Value 就直接使用 MessageListenerContainer 实例即可:

                        
                          private final ConcurrentMap<String, MessageListenerContainer> containerCache = Maps.newConcurrentMap();

                        
                      

这里既然选定了listenerId作为缓存的Key,那么必须定义好规范,要求无论注解声明式定义的消费者还是编程式定义的消费者,必须明确指定具体意义的listenerId,否则到时候存在Key的格式为org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#N会比较混乱 。

接下来发现和缓存所有消费者容器:

                        
                          private ConfigurableListableBeanFactory configurableListableBeanFactory;

private ApplicationEventPublisher applicationEventPublisher;

// ----------------------------------------------------------------------

// 获取声明式消费者容器
RabbitListenerEndpointRegistry endpointRegistry = configurableListableBeanFactory.getBean(
        RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
        RabbitListenerEndpointRegistry.class);
Set<String> listenerContainerIds = endpointRegistry.getListenerContainerIds();
for (String containerId : listenerContainerIds) {
    MessageListenerContainer messageListenerContainer = endpointRegistry.getListenerContainer(containerId);
    containerCache.putIfAbsent(containerId, messageListenerContainer);
}
// 获取编程式消费者容器
Map<String, MessageListenerContainer> messageListenerContainerBeans
        = configurableListableBeanFactory.getBeansOfType(MessageListenerContainer.class);
if (!CollectionUtils.isEmpty(messageListenerContainerBeans)) {
    messageListenerContainerBeans.forEach((beanId, bean) -> {
        if (bean instanceof AbstractMessageListenerContainer) {
            AbstractMessageListenerContainer abstractMessageListenerContainer = (AbstractMessageListenerContainer) bean;
            String listenerId = abstractMessageListenerContainer.getListenerId();
            if (StringUtils.hasLength(listenerId)) {
                containerCache.putIfAbsent(listenerId, abstractMessageListenerContainer);
            } else {
                containerCache.putIfAbsent(beanId, bean);
            }
        } else {
            containerCache.putIfAbsent(beanId, bean);
        }
    });
}
Set<String> listenerIds = containerCache.keySet();
listenerIds.forEach(listenerId -> log.info("Cache message listener container => {}", listenerId));
// 所有消费者容器Bean发现完成后才接收刷新事件
StaticEventPublisher.attachApplicationEventPublisher(this.applicationEventPublisher);

                        
                      

StaticEventPublisher 中的 ApplicationEventPublisher 属性延迟到所有消费者容器缓存完成后赋值,防止过早的属性变更通知导致部分消费者容器的启停操作被忽略.

管理消费者容器生命周期

接收到 RabbitmqToggleRefreshEvent 事件后,然后遍历传递过来的 RabbitmqToggleProperties 里面的 consumers ,再基于已经发现的消费者容器进行处理,代码大概如下:

                        
                          @EventListener(classes = RabbitmqToggleRefreshEvent.class)
public void onRabbitmqToggleRefreshEvent(RabbitmqToggleRefreshEvent event) {
    RabbitmqToggleProperties rabbitmqToggleProperties = event.getRabbitmqToggleProperties();
    List<RabbitmqToggleProperties.RabbitmqConsumer> consumers = rabbitmqToggleProperties.getConsumers();
    if (!CollectionUtils.isEmpty(consumers)) {
        consumers.forEach(consumerConf -> {
            String listenerId = consumerConf.getListenerId();
            if (StringUtils.hasLength(listenerId)) {
                MessageListenerContainer messageListenerContainer = containerCache.get(listenerId);
                if (Objects.nonNull(messageListenerContainer)) {
                    // running -> stop
                    if (messageListenerContainer.isRunning() && Objects.equals(Boolean.FALSE, consumerConf.getEnable())) {
                        messageListenerContainer.stop();
                        log.info("Message listener container => {} stop successfully", listenerId);
                    }
                    // modify concurrency
                    if (messageListenerContainer instanceof SimpleMessageListenerContainer) {
                        SimpleMessageListenerContainer simpleMessageListenerContainer
                                = (SimpleMessageListenerContainer) messageListenerContainer;
                        if (Objects.nonNull(consumerConf.getConcurrentConsumers())) {
                            simpleMessageListenerContainer.setConcurrentConsumers(consumerConf.getConcurrentConsumers());
                        }
                        if (Objects.nonNull(consumerConf.getMaxConcurrentConsumers())) {
                            simpleMessageListenerContainer.setMaxConcurrentConsumers(consumerConf.getMaxConcurrentConsumers());
                        }
                    }
                    // stop -> running
                    if (!messageListenerContainer.isRunning() && Objects.equals(Boolean.TRUE, consumerConf.getEnable())) {
                        messageListenerContainer.start();
                        log.info("Message listener container => {} start successfully", listenerId);
                    }
                }
            }
        });
    }
}

                        
                      

修改 Nacos 服务里面的 rabbitmq-toggle.properties 文件,输入内容如下:

                        
                          rabbitmq.toggle.consumers[0].listenerId=MultiAnnoInstanceDemoConsumer-firstOnInstanceMessage
rabbitmq.toggle.consumers[0].enable=true
rabbitmq.toggle.consumers[1].listenerId=MultiAnnoInstanceDemoConsumer-secondOnInstanceMessage
rabbitmq.toggle.consumers[1].enable=true
rabbitmq.toggle.consumers[2].listenerId=MultiAnnoMethodDemoConsumer
rabbitmq.toggle.consumers[2].enable=true
rabbitmq.toggle.consumers[3].listenerId=SingleAnnoMethodDemoConsumer
rabbitmq.toggle.consumers[3].enable=true
rabbitmq.toggle.consumers[4].listenerId=CustomMethodDemoConsumer
rabbitmq.toggle.consumers[4].enable=true
rabbitmq.toggle.consumers[5].listenerId=MessageListenerDemoConsumer
rabbitmq.toggle.consumers[5].enable=true

                        
                      

启动项目,观察 RabbitMQ WebUI 对应的队列消费者数量:

spring-boot-rabbit-nacos-control-9

然后随机修改 rabbitmq-toggle.properties 文件某个消费者容器设置为 enable = 'fasle' ,观察服务日志和观察 RabbitMQ WebUI 的变化:

spring-boot-rabbit-nacos-control-10

可见 RabbitMQ WebUI 中队列消费者数量减少,服务日志也提示 listenerId = 'MessageListenerDemoConsumer' 的消费者容器被停止了.

一些思考

为了更精确控制有消费者容器的启停,可以考虑在配置文件中定义关闭消费者容器的自动启动开关:

                        
                          spring.rabbitmq.listener.simple.auto-startup=false

                        
                      

可以考虑在 RabbitmqToggleProperties 首次回调 @PostConstruct 方法时候发布 RabbitmqToggleInitEvent 事件,然后监听此事件启动所有已经发现的消费者容器。这样就能做到应用内部的消费者的启停行为总是以 Nacos 的开关配置文件为准,并且可以实现 在线 启停和动态调整最小最大消费者数量.

另外,如果细心的话能够观察到服务日志中,每当监听到 Nacos 配置变动会打印 Started application in N seconds (JVM running for M) 的日志,这个并不是服务重启了,而是启动了一个 Spring 子容器用于构建一个全新的 StandardEnvironment (见文末 Demo 项目中的 EnvironmentCaptureApplicationRunner )用来承载刷新后的配置文件内容,然后再拷贝或者覆盖到当前的 Spring 容器中的 PropertySources ,这个过程的代码实现类似这样:

spring-boot-rabbit-nacos-control-11

小结

本文探讨了一种通过 Nacos 配置刷新方式管理 SpringBoot 服务中 RabbitMQ 消费者生命周期管理的方案,目前只是提供了完整的思路和一些 Demo 级别代码,后续应该会完善方案和具体的工程级别编码实现.

本文 Demo 项目仓库:

  • framework-mesh/rabbitmq-rocketmq-demo

(本文完 c-3-d e-a-20230212) 。

最后此篇关于通过Nacos配置刷新进行RabbitMQ消费者在线启停的文章就讲到这里了,如果你想了解更多关于通过Nacos配置刷新进行RabbitMQ消费者在线启停的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

35 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com