gpt4 book ai didi

解决SpringBoot整合RocketMQ遇到的坑

转载 作者:qq735679552 更新时间:2022-09-29 22:32:09 30 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章解决SpringBoot整合RocketMQ遇到的坑由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

应用场景

在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group、Topic以及selectorExpression(数据过滤、选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换.

引入依赖

?
1
2
3
4
5
6
<!-- RocketMq Spring Boot Starter-->
< dependency >
  < groupId >org.apache.rocketmq</ groupId >
  < artifactId >rocketmq-spring-boot-starter</ artifactId >
  < version >2.0.4</ version >
  </ dependency >

消费者代码

?
1
2
3
4
5
6
7
@RocketMQMessageListener (consumerGroup = "${rocketmq.group}" ,topic = "${rocketmq.topic}" ,selectorExpression = "${rocketmq.selectorExpression}" )
public class Consumer implements RocketMQListener<String> {
     @Override
     public void onMessage(String s) {
         System.out.println( "消费到的数据为:" +s);
     }
}

问题排查

RocketMQMessageListener整个注解默认selectorExpression为*,表示接收当前Topic下的所有数据,如果我们想对tags进行动态配置,在使用${rocketmq.selectorExpression}表达式时会发现所有数据全被过滤了,跟踪源码(ListenerContainerConfiguration.java)发现在创建listener时selectorExpression的数据在通environment环境变量中获取对应的数据后又被覆盖了,导致整个过滤条件被变更为表达式.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Override
     public void afterSingletonsInstantiated() {
         // 获取所有所有使用了RocketMQMessageListener注解的bean
         Map<String, Object> beans = this .applicationContext.getBeansWithAnnotation(RocketMQMessageListener. class );
         if (Objects.nonNull(beans)) {
             // 循环注册容器
             beans.forEach( this ::registerContainer);
         }
     }
     private void registerContainer(String beanName, Object bean) {
         Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
         // 校验当前bean是否实现了RocketMQListener接口
         if (!RocketMQListener. class .isAssignableFrom(bean.getClass())) {
             throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener. class .getName());
         }
         // 获取bean上的annotation
         RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener. class );
         // 解析group及topic,可支持表达式
         String consumerGroup = this .environment.resolvePlaceholders(annotation.consumerGroup());
         String topic = this .environment.resolvePlaceholders(annotation.topic());
         boolean listenerEnabled =
             ( boolean )rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                 .getOrDefault(topic, true );
         if (!listenerEnabled) {
             log.debug(
                 "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization." ,
                 consumerGroup, topic);
             return ;
         }
         validate(annotation);
         String containerBeanName = String.format( "%s_%s" , DefaultRocketMQListenerContainer. class .getName(),
             counter.incrementAndGet());
         GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
         // 注册bean的,调用createRocketMQListenerContainer
         genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer. class ,
             () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
         DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
             DefaultRocketMQListenerContainer. class );
         if (!container.isRunning()) {
             try {
                 container.start();
             } catch (Exception e) {
                 log.error( "Started container failed. {}" , container, e);
                 throw new RuntimeException(e);
             }
         }
         log.info( "Register the listener to container, listenerBeanName:{}, containerBeanName:{}" , beanName, containerBeanName);
     }
     private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
         RocketMQMessageListener annotation) {
         DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();       
         container.setRocketMQMessageListener(annotation);       
         String nameServer = environment.resolvePlaceholders(annotation.nameServer());
         nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
         String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
         container.setNameServer(nameServer);
         if (!StringUtils.isEmpty(accessChannel)) {
             container.setAccessChannel(AccessChannel.valueOf(accessChannel));
         }
         container.setTopic(environment.resolvePlaceholders(annotation.topic()));
         // 此处已经根据表达式将数据取出
         String tags = environment.resolvePlaceholders(annotation.selectorExpression());
         if (!StringUtils.isEmpty(tags)) {
             container.setSelectorExpression(tags);
         }
         container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
         // 此处将SelectorExpression的数据覆盖成了表达式
         container.setRocketMQMessageListener(annotation);
         container.setRocketMQListener((RocketMQListener)bean);
         container.setObjectMapper(objectMapper);
         container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
         container.setName(name);  // REVIEW ME, use the same clientId or multiple?
         return container;
     }

问题解决

因为ListenerContainerConfiguration类是实现了SmartInitializingSingleton接口的afterSingletonsInstantiated方法,我们可以通过反射对selectorExpression的数据在ListenerContainerConfiguration进行初始化前进行解析并赋值回去.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
  * 在springboot初始化后,RocketMQ容器初始化前利用反射动态改变数据
**/
@Configuration
public class ChangeSelectorExpressionBeforeMQInit implements InitializingBean {
     @Autowired
     private ApplicationContext applicationContext;
     @Autowired
     private StandardEnvironment environment;
     @Override
     public void afterPropertiesSet() throws Exception {
         Map<String,Object> beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener. class );
         for (Object bean : beans.values()){
             Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
             if (!RocketMQListener. class .isAssignableFrom(bean.getClass())) {
                 continue ;
             }
             RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener. class );
             InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation);
             Field field = invocationHandler.getClass().getDeclaredField( "memberValues" );
             field.setAccessible( true );
             Map<String, Object> memberValues = (Map<String, Object>) field.get(invocationHandler);
             for (Map.Entry<String,Object> entry: memberValues.entrySet()) {
                 if (Objects.nonNull(entry)){
                     memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue())));
                 }
             }
         }
     }
}

初次之外,在2.1.0版本的依赖包中已经修复了此Bug,在不造成依赖冲突的前提下,建议使用2.1.0以上的版本包.

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我.

原文链接:https://blog.csdn.net/xiaojun081004/article/details/104954802 。

最后此篇关于解决SpringBoot整合RocketMQ遇到的坑的文章就讲到这里了,如果你想了解更多关于解决SpringBoot整合RocketMQ遇到的坑的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com