gpt4 book ai didi

java - Spring JMS ActiveMQ 消息传递咨询主题不起作用

转载 作者:行者123 更新时间:2023-12-02 09:13:32 25 4
gpt4 key购买 nike

出于 POC 目的,我构建了一个 Spring Boot 应用程序,该应用程序使用 ActiveMQ 通过 JMSTemplate 进行消息传递。

为了进行监视,我想监听使用“咨询主题”放入队列和从队列中删除的消息。

我已更新 ActiveMQ 配置以启用相关建议:

<!-- activemq.xml -->
<broker xmlns="http://activemq.apache.org/schema/core" useJmx="true" brokerName="localhost" dataDirectory="${activemq.data}">

<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" advisoryForConsumed="true" advisoryForDelivery="true">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry queue=">" advisoryForConsumed="true" advisoryForDelivery="true">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

</broker>

在应用程序中,我配置了 JMS 连接工厂和 JMS 监听器容器工厂以启用建议和 pubsub 域,并为建议主题设置监听器:

@Configuration
public class JmsConfig {
@Autowired
MessageListener messageListener;

@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setWatchTopicAdvisories(true);
connectionFactory.setBrokerURL("vm://localhost?broker.persistent=false");
return connectionFactory;
}

@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) throws JMSException {

DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true);
configurer.configure(factory, connectionFactory);

Connection connection = connectionFactory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Queue bulkQueue = session.createQueue("bulk");

Topic deliveredAdvisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(bulkQueue);
MessageConsumer deliveredAdvisoryTopicConsumer = session.createConsumer(deliveredAdvisoryTopic);
deliveredAdvisoryTopicConsumer.setMessageListener(messageListener);

Topic consumedAdvisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic(bulkQueue);
MessageConsumer consumedAdvisoryTopicConsumer = session.createConsumer(consumedAdvisoryTopic);
consumedAdvisoryTopicConsumer.setMessageListener(messageListener);

return factory;
}

将读取咨询主题的监听器仅用于记录:

@Component
public class AdvisoryMessageListener implements MessageListener {
@Override public void onMessage(Message message) {
System.out.println("Received advisory message");
System.out.println(message);
}
}

从队列中读取的实际监听器与咨询消息监听器类似:

@Component
public class Receiver {

@JmsListener(destination = "bulk", containerFactory = "jmsListenerContainerFactory")
public void receiveMessage(Email email) {
System.out.println("Received <" + email + ">");
}

}

Rest API 将触发应用程序将消息放入队列:


@RestController("/emails")
public class EmailController {

@Autowired
private JmsTemplate jmsTemplate;

@PostMapping("/")
public void persistEmail(@RequestBody Email email) {
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setTimeToLive(0L);
jmsTemplate.convertAndSend("bulk", email);
}
}

每当调用 API 并将电子邮件放入队列时,Receiver.receiveMessage 都会读取它并记录它,但 AdvisoryMessageListener 中没有任何操作。

控制台中唯一显示的内容如下: Received <Email{to=foo@bar.com, body=Hello}>由Receiver.receiveMessage打印

我做错了什么?

最佳答案

这对我来说效果很好......

@SpringBootApplication
public class So59196698Application {

public static void main(String[] args) {
SpringApplication.run(So59196698Application.class, args);
}

@JmsListener(destination = "so59196698")
public void listen(Message in) {
System.out.println("Received:" + in);
}

@JmsListener(destination = "#{advisoryTopicNames.deliveredTopic}", containerFactory = "topicFactory")
public void delivered(Message in) {
System.out.println("Delivered:" + in);
}

@JmsListener(destination = "#{advisoryTopicNames.consumedTopic}", containerFactory = "topicFactory")
public void consumed(Message in) {
System.out.println("Consumed:" + in);
}

@Bean
public ApplicationRunner runner(JmsTemplate template) {
return args -> {
Thread.sleep(5000);
template.convertAndSend("so59196698", "test");
};
}

@Bean
public JmsListenerContainerFactory<?> topicFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {

DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setPubSubDomain(true);
return factory;
}

}

@Component
class AdvisoryTopicNames {

private static final Destination QUEUE = new ActiveMQQueue("so59196698");

public String getDeliveredTopic() throws JMSException {
return AdvisorySupport.getMessageDeliveredAdvisoryTopic(QUEUE).getTopicName();
}

public String getConsumedTopic() throws JMSException {
return AdvisorySupport.getMessageConsumedAdvisoryTopic(QUEUE).getTopicName();
}

}

Received:ActiveMQTextMessage {commandId = 11, ...
Delivered:ActiveMQMessage {commandId = 0, ...
Consumed:ActiveMQMessage {commandId = 0, ...

关于java - Spring JMS ActiveMQ 消息传递咨询主题不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59196698/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com