- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试在 Spring Boot 中实现一个网关,具有 REST 端点并将消息插入到 RabbitMQ 代理中。我需要处理错误,因此我使用 DLQ 配置了一个replyAddress,并使用我的 RabbitTemplate 配置了一个 SimpleMessageListenerContainer,以将其标记为“监听器”并能够使用回复队列。
它可以与“硬编码”bean 配合使用:
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setReceiveTimeout(0);
template.setReplyTimeout(10000);
template.setExchange("inputExchange");
template.setRoutingKey("routing.1");
template.setReplyAddress("replyQueue1");
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
DefaultClassMapper classMapper = new DefaultClassMapper();
classMapper.setDefaultType(Event.class);
messageConverter.setClassMapper(classMapper);
template.setMessageConverter(messageConverter);
return template;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("replyQueue1");
container.setMessageListener(rabbitTemplate(connectionFactory));
return container;
}
但是这个网关的目标是完全可配置的,因此不需要对通往 Rabbit 交换/队列的每个路由进行编码。
例如,我在 yaml 中有此配置:
routes:
service1:
exchange: inputExchange
queue: inputQueue1
routing: routing.1
replyQueue: replyQueue1
dlExchange: reply.dlx1
dlQueue: dlx.queue1.reply
receiveTimeout: 0
replyTimeout: 10000
preProcessors: package.processor.LowercaseProcessor
postProcessors: package.processor.UppercaseProcessor
service2:
exchange: inputExchange
queue: inputQueue2
routing: routing.2
所以我需要动态创建 RabbitTemplate 和 SimpleMessageListenerContainer 来为每个服务配置replyQueue、DLQ,...
我尝试使用此代码:
@Configuration
public class RabbitTemplatesConfiguration implements BeanFactoryAware {
@Autowired
private GatewayProperties properties;
@Autowired
private ConnectionFactory connectionFactory;
private BeanFactory beanFactory;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
@PostConstruct
public void configure() {
Assert.state(beanFactory instanceof ConfigurableBeanFactory, "wrong bean factory type");
ConfigurableBeanFactory configurableBeanFactory = (ConfigurableBeanFactory) beanFactory;
Map<String, ServiceProperties> routes = properties.getRoutes();
if (routes != null) {
for (String service : routes.keySet()) {
ServiceProperties props = routes.get(service);
createTemplate(configurableBeanFactory, service, props);
}
}
}
private void createTemplate(ConfigurableBeanFactory configurableBeanFactory, String service, ServiceProperties props) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setExchange(props.getExchange());
template.setRoutingKey(props.getRouting());
template.setReplyAddress(props.getReplyQueue());
template.setReceiveTimeout(props.getReceiveTimeout());
template.setReplyTimeout(props.getReplyTimeout());
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
DefaultClassMapper classMapper = new DefaultClassMapper();
classMapper.setDefaultType(Event.class);
messageConverter.setClassMapper(classMapper);
template.setMessageConverter(messageConverter);
configurableBeanFactory.registerSingleton(service + "Template", template);
if(!StringUtils.isEmpty(props.getReplyQueue())) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(props.getReplyQueue());
container.setMessageListener(new MessageListenerAdapter(template));
configurableBeanFactory.registerSingleton(service + "ListenerContainer", container);
container.afterPropertiesSet(); //added this but not working either
container.start(); //added this but not working either
}
}
}
但是当我收到回复队列上的响应时,出现此错误:
java.lang.IllegalStateException: RabbitTemplate is not configured as MessageListener - cannot use a 'replyAddress': replyQueue1
at org.springframework.util.Assert.state(Assert.java:70)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithFixed(RabbitTemplate.java:1312)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:1251)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceiveRaw(RabbitTemplate.java:1218)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:1189)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:1156)
因此 SimpleMessageListenerContainer 似乎没有正确实例化/配置。
你知道问题出在哪里吗?
我的代码发送和接收:
@Autowired
private ApplicationContext context;
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private GatewayProperties properties;
@PostMapping("/{service}")
public ResponseEntity<Object> call(@PathVariable("service") String service, @RequestBody Event body) {
ServiceProperties serviceProperties = properties.getRoutes().get(service);
Queue queue = QueueBuilder.durable(serviceProperties.getQueue()).build();
rabbitAdmin.declareQueue(queue);
TopicExchange exchange = new TopicExchange(serviceProperties.getExchange());
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(serviceProperties.getRouting()));
Queue replyQueue = null;
if (!StringUtils.isEmpty(serviceProperties.getReplyQueue())) {
replyQueue = QueueBuilder.durable(serviceProperties.getReplyQueue()).withArgument("x-dead-letter-exchange", serviceProperties.getDlExchange()).build();
rabbitAdmin.declareQueue(replyQueue);
Queue dlQueue = QueueBuilder.durable(serviceProperties.getDlQueue()).build();
rabbitAdmin.declareQueue(dlQueue);
TopicExchange dlqExchange = new TopicExchange(serviceProperties.getDlExchange());
rabbitAdmin.declareExchange(dlqExchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(dlQueue).to(dlqExchange).with(serviceProperties.getReplyQueue()));
}
RabbitTemplate template = (RabbitTemplate) context.getBean(service + "Template");
Event outputMessage = (Event) template.convertSendAndReceive(serviceProperties.getExchange(), serviceProperties.getRouting(), body, new CorrelationData(UUID.randomUUID().toString()));
//...
}
最佳答案
不清楚为什么要使用回复队列; RabbitMQ 现在提供了一种直接回复机制,消除了使用固定回复队列的大部分原因(一个异常(exception)是如果您需要 HA 回复队列)。
也就是说,问题是您将模板包装在 MessageListenerAdapter
中 - 这是不必要的(并且无论如何都不会工作) - 模板实现了 MessageListener
。
关于java - Spring AMQP动态创建RabbitTemplate和SimpleMessageListenerContainer,报错RabbitTemplate is not配置为MessageListener,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42972983/
我正在使用 XMS 7.5 客户端访问 IBM MQ,想了解有关 MessageListener 的一件事。当队列中存在多条消息时, 相关的 MessageListener 方法(即下面代码中的 Pr
this question的答案|解释如何将原型(prototype)范围与 一起使用当监听器不是线程安全的时,在 Spring AMQP 中。 另一位用户询问(在评论中)如何仅使用 Java 配置
如何对实现 spring-kafka MessageListener 接口(interface)的类进行单元测试?我有一个监听器类,我正在使用 onMessage 函数手动监听主题。这个函数很简单,就
我正在尝试使用 JMS 实现在 #Solace Appliance 上发送/接收消息。 我正在使用 SolConnectionFactoryImpl 工厂来创建连接。一切看起来都不错,但是当我尝试将
我正在使用 ActiveMQ。我的消费者代码是从 main 方法调用的。一旦主类终止,我希望 JMSMessageListener 已在队列中注册,并且每当“TestTopic”上有消息时,将调用 o
有没有办法在 MessageListener 中注入(inject)服务? @Autowired @Qualifier("myServices") MyServices myService
订阅附近的服务总是声明订阅成功但 MessageListener 从未调用过: 主 Activity .java SubscribeOptions options = new SubscribeOpt
我使用 Spring Boot 2.2,需要通过 JMS 接收消息。 我看到我们可以使用注释 @Component public class JMSReceiver { @JmsListene
我使用 Spring Boot 2.2,需要通过 JMS 接收消息。 我看到我们可以使用注释 @Component public class JMSReceiver { @JmsListene
我正在测试使用 Camel 和 ActiveMQ 的 JMS 请求/回复的示例。当camel为你创建监听器时,我可以让这个例子工作。即。 from("direct:entryPoint").inOut
我有一个 spring 应用程序,我想使用 JMS Message Groups处理特定 block 中的 JMS 消息(以及相同的事务等)。基本上说我有 5 个相关事件,我有一个 JMSTempla
我正在尝试使用 Nearby API 将来 self 的 Eddystone Beacon (micro:bit) 的 Beacon 消息发送到我的 Android 应用程序.我从文档中复制了所有代码
我已经实现了 Oracle Advanced Queue,并且正在编写一个监听器程序。以下是我的示例: package com.myprog; import java.io.File; import
我一直在使用 JMS 和 ActiveMQ。一切都在创造奇迹。我没有使用 spring,我也不会。 javax.jms.MessageListener 接口(interface)只有一个方法,onMe
我有一个使用 Spring 框架的 Java 应用程序。它是一个监听器应用程序,可以非常快速地使用线程从某个源异步获取消息并将其保存到数据库中。 我正在使用 ExecutorService,它在 Se
如何确保有两个监听器容器: 接收来自不同主题的消息。线程安全吗? 正如您所看到的,它们使
我正在尝试使用 spring amqp 来使用 rabbitmq,下面是我的配置。 这是一个简单的消息监听器类, import org.springframework.amqp
如果我为 MessageConsumer 设置一个 MessageListener 对象,然后对该消费者调用 receive(),会发生什么?提供者会将消息交给 receive(),还是在消息被 Me
使用 spring 部署应用程序后如何启动 ActiveMQ messageListener? 下面给出了我的 xml 文件, -->
当我在我的队列 MessageConsumer 对象上设置一个 messageListener 时,到底发生了什么。 MessageConsumer 对象是否在后台进行某种轮询,或者这是 JMS 服务
我是一名优秀的程序员,十分优秀!