gpt4 book ai didi

java - spring jmsListener 监听多个队列

转载 作者:行者123 更新时间:2023-11-30 05:41:30 32 4
gpt4 key购买 nike

在这篇文章中,Garry Russell 解释了如何以编程方式创建多个 KafkaListener 来监听多个主题。.[这个设置实际上对我来说很成功] Kafka Spring: How to create Listeners dynamically or in a loop?

现在我也希望对 JMSListener 进行类似的设置 - 我可以在一个类中包含一个 @JMSListener,并且我可以以编程方式创建该 JMSListener 的多个实例,每个实例都注入(inject)自己的queueName。

我找到了这篇文章 Spring JMS start listening to jms queues on request

在这篇文章的最后,加里发表了类似的评论,

If you wish to dynamically create lots of containers, then just create the containers programmatically, call afterPropertiesSet(), then start()

我使用了上面第一篇文章中的设置(与 KafkaListeners 相关),我的多个 JMS 监听器实例正在启动,但没有消耗任何消息。

基本上我不明白我该在哪里做

then just create the containers programmatically, call afterPropertiesSet(), then start()

我对容器这个词感到困惑,我知道有 JMSListener 并且有JmsListenerContainerFactory,在此上下文中什么是容器 - 我猜是 JMSListener?

我已确认队列中有消息。另外,当我不以编程方式创建监听器并且只有一个监听器及其上提到的硬编码队列时,它会很好地消耗消息。

当我以编程方式创建多个 JMS 监听器时,基本上没有监听器正在使用消息

    @SpringBootApplication
@EnableJms
public class MqProdConsumerApplication {
private static Logger logger = LogManager.getLogger(MqProdConsumerApplication.class.getName());
private static Consumers consumersStatic;

@Autowired
Consumers consumers;

@PostConstruct
public void init() {
consumersStatic = this.consumers;
}

@Bean
public Gson gson() {
return new Gson();
}

public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(MqProdConsumerApplication.class, args);
List<QueueInformation> queueInformationList = consumersStatic.getQueueInformationList();
Assert.notEmpty(queueInformationList, "queueInformationList cannot be empty");
logger.debug("queueInformationList ************" + queueInformationList.toString());
for (QueueInformation queueInformation : queueInformationList) {
AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
child.setParent(context);
child.register(MQConfig.class);
Properties props = new Properties();
props.setProperty("mqQueueName", queueInformation.getMqQueueName());
//
PropertiesPropertySource pps = new PropertiesPropertySource("listenerProps", props);
child.getEnvironment().getPropertySources().addLast(pps);
child.refresh();
}
}
}

这是具有listenerContainerFactory 的MQConfig

@Configuration
public class MQConfig {
Logger logger = LoggerFactory.getLogger(this.getClass());

@Value("${ibm.mq.user}")
private String mqUserName;

@Bean
public MQListener listener() {
return new MQListener();
}

@PostConstruct
public void afterConstruct() {
logger.debug("************* initialized MQ Config successfully for user =" + mqUserName);
}

@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);

// Put the MQ username in the PCF environment.
// Otherwise, the connection is identified by PCF's default user, "VCAP"
System.setProperty("user.name", mqUserName);
return factory;
}
}

然后是 MQListener,它具有实际的 @JMSListener

    public class MQListener {
Logger logger = LoggerFactory.getLogger(this.getClass());

@Value("${mqQueueName}")
private String mqQueueName;

@PostConstruct
public void afteConstruct() {
logger.debug("************* initialized MQ Listener successfully, will read from =" + mqQueueName);

}

@JmsListener(destination = "${mqQueueName}", containerFactory = "myFactory")
public void receiveMessage(String receivedMessage) throws JAXBException, ExecutionException, InterruptedException {
logger.debug("***********************************************receivedMessage:" + receivedMessage);
}
}

这是我的 application.yml

    ibm.mq.queueManager: ABCTOD01
ibm.mq.channel: QMD00.SERVER
ibm.mq.connName: mqdv1.devfg.ABC.com
ibm.mq.user: pmd0app1
ibm.mq.password:
consumers:
queueInformationList:
-
mqQueueName: QMD00.D.SRF.PERSON.LITE.PHONE.LOAD
-
mqQueueName: QMD00.D.SRF.PERSON.PHONE.LOAD

最佳答案

好的,我找到了另一篇帖子,其中加里已经回答了我正在寻找的内容 Adding Dynamic Number of Listeners(Spring JMS)

基本上这就是我的工作解决方案。干得好@GaryRussell - 我现在是粉丝了:)

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
List<QueueInformation> queueInformationList = consumersStatic.getQueueInformationList();
int i = 0;
for (QueueInformation queueInformation :
queueInformationList) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("myJmsEndpoint-" + i++);
endpoint.setDestination(queueInformation.getMqQueueName());
endpoint.setMessageListener(message -> {
logger.debug("***********************************************receivedMessage:" + message);
});
registrar.registerEndpoint(endpoint);
logger.debug("registered the endpoint for queue" + queueInformation.getMqQueueName());
}
}

另请参阅https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#jms-annotated-programmatic-registration

关于java - spring jmsListener 监听多个队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55543440/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com