- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章解决SpringBoot整合RocketMQ遇到的坑由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group、Topic以及selectorExpression(数据过滤、选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换.
1
2
3
4
5
6
|
<!-- RocketMq Spring Boot Starter-->
<
dependency
>
<
groupId
>org.apache.rocketmq</
groupId
>
<
artifactId
>rocketmq-spring-boot-starter</
artifactId
>
<
version
>2.0.4</
version
>
</
dependency
>
|
1
2
3
4
5
6
7
|
@RocketMQMessageListener
(consumerGroup =
"${rocketmq.group}"
,topic =
"${rocketmq.topic}"
,selectorExpression =
"${rocketmq.selectorExpression}"
)
public
class
Consumer
implements
RocketMQListener<String> {
@Override
public
void
onMessage(String s) {
System.out.println(
"消费到的数据为:"
+s);
}
}
|
RocketMQMessageListener整个注解默认selectorExpression为*,表示接收当前Topic下的所有数据,如果我们想对tags进行动态配置,在使用${rocketmq.selectorExpression}表达式时会发现所有数据全被过滤了,跟踪源码(ListenerContainerConfiguration.java)发现在创建listener时selectorExpression的数据在通environment环境变量中获取对应的数据后又被覆盖了,导致整个过滤条件被变更为表达式.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
@Override
public
void
afterSingletonsInstantiated() {
// 获取所有所有使用了RocketMQMessageListener注解的bean
Map<String, Object> beans =
this
.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.
class
);
if
(Objects.nonNull(beans)) {
// 循环注册容器
beans.forEach(
this
::registerContainer);
}
}
private
void
registerContainer(String beanName, Object bean) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
// 校验当前bean是否实现了RocketMQListener接口
if
(!RocketMQListener.
class
.isAssignableFrom(bean.getClass())) {
throw
new
IllegalStateException(clazz +
" is not instance of "
+ RocketMQListener.
class
.getName());
}
// 获取bean上的annotation
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.
class
);
// 解析group及topic,可支持表达式
String consumerGroup =
this
.environment.resolvePlaceholders(annotation.consumerGroup());
String topic =
this
.environment.resolvePlaceholders(annotation.topic());
boolean
listenerEnabled =
(
boolean
)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
.getOrDefault(topic,
true
);
if
(!listenerEnabled) {
log.debug(
"Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization."
,
consumerGroup, topic);
return
;
}
validate(annotation);
String containerBeanName = String.format(
"%s_%s"
, DefaultRocketMQListenerContainer.
class
.getName(),
counter.incrementAndGet());
GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
// 注册bean的,调用createRocketMQListenerContainer
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.
class
,
() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.
class
);
if
(!container.isRunning()) {
try
{
container.start();
}
catch
(Exception e) {
log.error(
"Started container failed. {}"
, container, e);
throw
new
RuntimeException(e);
}
}
log.info(
"Register the listener to container, listenerBeanName:{}, containerBeanName:{}"
, beanName, containerBeanName);
}
private
DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container =
new
DefaultRocketMQListenerContainer();
container.setRocketMQMessageListener(annotation);
String nameServer = environment.resolvePlaceholders(annotation.nameServer());
nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
container.setNameServer(nameServer);
if
(!StringUtils.isEmpty(accessChannel)) {
container.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
// 此处已经根据表达式将数据取出
String tags = environment.resolvePlaceholders(annotation.selectorExpression());
if
(!StringUtils.isEmpty(tags)) {
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
// 此处将SelectorExpression的数据覆盖成了表达式
container.setRocketMQMessageListener(annotation);
container.setRocketMQListener((RocketMQListener)bean);
container.setObjectMapper(objectMapper);
container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
container.setName(name);
// REVIEW ME, use the same clientId or multiple?
return
container;
}
|
因为ListenerContainerConfiguration类是实现了SmartInitializingSingleton接口的afterSingletonsInstantiated方法,我们可以通过反射对selectorExpression的数据在ListenerContainerConfiguration进行初始化前进行解析并赋值回去.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
/**
* 在springboot初始化后,RocketMQ容器初始化前利用反射动态改变数据
**/
@Configuration
public
class
ChangeSelectorExpressionBeforeMQInit
implements
InitializingBean {
@Autowired
private
ApplicationContext applicationContext;
@Autowired
private
StandardEnvironment environment;
@Override
public
void
afterPropertiesSet()
throws
Exception {
Map<String,Object> beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener.
class
);
for
(Object bean : beans.values()){
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
if
(!RocketMQListener.
class
.isAssignableFrom(bean.getClass())) {
continue
;
}
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.
class
);
InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation);
Field field = invocationHandler.getClass().getDeclaredField(
"memberValues"
);
field.setAccessible(
true
);
Map<String, Object> memberValues = (Map<String, Object>) field.get(invocationHandler);
for
(Map.Entry<String,Object> entry: memberValues.entrySet()) {
if
(Objects.nonNull(entry)){
memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue())));
}
}
}
}
}
|
初次之外,在2.1.0版本的依赖包中已经修复了此Bug,在不造成依赖冲突的前提下,建议使用2.1.0以上的版本包.
以上为个人经验,希望能给大家一个参考,也希望大家多多支持我.
原文链接:https://blog.csdn.net/xiaojun081004/article/details/104954802 。
最后此篇关于解决SpringBoot整合RocketMQ遇到的坑的文章就讲到这里了,如果你想了解更多关于解决SpringBoot整合RocketMQ遇到的坑的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
前言 每日站会(Daily Standup)是团队统一节奏的、在固定时间发生的、帮助团队内部快速同步进展的敏捷实践活动: 站会的目的是让团队能更好地对齐 Sprint 目标;
jdbcTemplate 中的queryForList,你真的懂吗? 你想象中的queryForList是不是应该长成下面这种模样? String sql = "select *
python是一门清晰简洁的语言,如果你对一些细节不了解的话,就会掉入到那些深不见底的“坑”里,下面,我就来总结一些python里常见的坑。 列表创建和引用 嵌套列表的创建 使用*号来创建一个
如今,在DevOps当中建立安全体系显得比以往任何时候都更加重要。《2021年企业DevOps技能提升报告》指出,56%的受访者表示DevSecOps已经成为自动化工具中的一大必备要素。然而,D
前言 相信看到这个题目,可能大家都觉得是一个老生常谈的月经topic了。一直以来其实把握一个“值传递”基本上就能理解各种情况了,不过最近遇到了更深一点的“小坑”,与大家分享一下。 首先还是从最简
前言 Go 中的for range组合可以和方便的实现对一个数组或切片进行遍历,但是在某些情况下使用for range时很可能就会被"坑",下面用一段代码来模拟下:
大家好,我是明哥。 在开始之前,先考你一个非常 Go 味的经典问题:如何判断一个 interface{} 的值是否为 nil ? 这也是面试有可能会被问到的一个问题,这个问题很 “迷”,平时
ava并发包有很大一部分内容都是关于并发容器的,因此学习和搞懂这部分的内容很有必要。 Java 1.5 之前提供的同步容器虽然也能保证线程安全,但是性能很差,而 Java 1.5 版本之后提供的并发
大家好,我是煎鱼。 前几天在读者交流群里看到一位小伙伴,针对 interface 的使用有了比较大的疑惑。 无独有偶,我也在网上看到有小伙伴在 Go 面试的时候被问到了:
我是一名优秀的程序员,十分优秀!