gpt4 book ai didi

java - Spring AMQP动态创建RabbitTemplate和SimpleMessageListenerContainer,报错RabbitTemplate is not配置为MessageListener

转载 作者:行者123 更新时间:2023-12-02 02:56:30 25 4
gpt4 key购买 nike

我正在尝试在 Spring Boot 中实现一个网关,具有 REST 端点并将消息插入到 RabbitMQ 代理中。我需要处理错误,因此我使用 DLQ 配置了一个replyAddress,并使用我的 RabbitTemplate 配置了一个 SimpleMessageListenerContainer,以将其标记为“监听器”并能够使用回复队列。

它可以与“硬编码”bean 配合使用:

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setReceiveTimeout(0);
template.setReplyTimeout(10000);
template.setExchange("inputExchange");
template.setRoutingKey("routing.1");
template.setReplyAddress("replyQueue1");

Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
DefaultClassMapper classMapper = new DefaultClassMapper();
classMapper.setDefaultType(Event.class);
messageConverter.setClassMapper(classMapper);
template.setMessageConverter(messageConverter);

return template;
}

@Bean
public SimpleMessageListenerContainer replyListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("replyQueue1");
container.setMessageListener(rabbitTemplate(connectionFactory));
return container;
}

但是这个网关的目标是完全可配置的,因此不需要对通往 Rabbit 交换/队列的每个路由进行编码。

例如,我在 yaml 中有此配置:

routes:
service1:
exchange: inputExchange
queue: inputQueue1
routing: routing.1
replyQueue: replyQueue1
dlExchange: reply.dlx1
dlQueue: dlx.queue1.reply
receiveTimeout: 0
replyTimeout: 10000
preProcessors: package.processor.LowercaseProcessor
postProcessors: package.processor.UppercaseProcessor
service2:
exchange: inputExchange
queue: inputQueue2
routing: routing.2

所以我需要动态创建 RabbitTemplate 和 SimpleMessageListenerContainer 来为每个服务配置replyQueue、DLQ,...

我尝试使用此代码:

@Configuration
public class RabbitTemplatesConfiguration implements BeanFactoryAware {

@Autowired
private GatewayProperties properties;
@Autowired
private ConnectionFactory connectionFactory;

private BeanFactory beanFactory;

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}

@PostConstruct
public void configure() {
Assert.state(beanFactory instanceof ConfigurableBeanFactory, "wrong bean factory type");
ConfigurableBeanFactory configurableBeanFactory = (ConfigurableBeanFactory) beanFactory;

Map<String, ServiceProperties> routes = properties.getRoutes();
if (routes != null) {
for (String service : routes.keySet()) {
ServiceProperties props = routes.get(service);
createTemplate(configurableBeanFactory, service, props);
}
}
}

private void createTemplate(ConfigurableBeanFactory configurableBeanFactory, String service, ServiceProperties props) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setExchange(props.getExchange());
template.setRoutingKey(props.getRouting());
template.setReplyAddress(props.getReplyQueue());

template.setReceiveTimeout(props.getReceiveTimeout());
template.setReplyTimeout(props.getReplyTimeout());

Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
DefaultClassMapper classMapper = new DefaultClassMapper();
classMapper.setDefaultType(Event.class);
messageConverter.setClassMapper(classMapper);
template.setMessageConverter(messageConverter);

configurableBeanFactory.registerSingleton(service + "Template", template);

if(!StringUtils.isEmpty(props.getReplyQueue())) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(props.getReplyQueue());
container.setMessageListener(new MessageListenerAdapter(template));
configurableBeanFactory.registerSingleton(service + "ListenerContainer", container);
container.afterPropertiesSet(); //added this but not working either
container.start(); //added this but not working either
}
}
}

但是当我收到回复队列上的响应时,出现此错误:

java.lang.IllegalStateException: RabbitTemplate is not configured as MessageListener - cannot use a 'replyAddress': replyQueue1
at org.springframework.util.Assert.state(Assert.java:70)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithFixed(RabbitTemplate.java:1312)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:1251)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceiveRaw(RabbitTemplate.java:1218)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:1189)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:1156)

因此 SimpleMessageListenerContainer 似乎没有正确实例化/配置。

你知道问题出在哪里吗?

我的代码发送和接收:

@Autowired
private ApplicationContext context;
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private GatewayProperties properties;

@PostMapping("/{service}")
public ResponseEntity<Object> call(@PathVariable("service") String service, @RequestBody Event body) {
ServiceProperties serviceProperties = properties.getRoutes().get(service);

Queue queue = QueueBuilder.durable(serviceProperties.getQueue()).build();
rabbitAdmin.declareQueue(queue);
TopicExchange exchange = new TopicExchange(serviceProperties.getExchange());
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(serviceProperties.getRouting()));

Queue replyQueue = null;
if (!StringUtils.isEmpty(serviceProperties.getReplyQueue())) {
replyQueue = QueueBuilder.durable(serviceProperties.getReplyQueue()).withArgument("x-dead-letter-exchange", serviceProperties.getDlExchange()).build();
rabbitAdmin.declareQueue(replyQueue);
Queue dlQueue = QueueBuilder.durable(serviceProperties.getDlQueue()).build();
rabbitAdmin.declareQueue(dlQueue);
TopicExchange dlqExchange = new TopicExchange(serviceProperties.getDlExchange());
rabbitAdmin.declareExchange(dlqExchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(dlQueue).to(dlqExchange).with(serviceProperties.getReplyQueue()));
}

RabbitTemplate template = (RabbitTemplate) context.getBean(service + "Template");

Event outputMessage = (Event) template.convertSendAndReceive(serviceProperties.getExchange(), serviceProperties.getRouting(), body, new CorrelationData(UUID.randomUUID().toString()));

//...
}

最佳答案

不清楚为什么要使用回复队列; RabbitMQ 现在提供了一种直接回复机制,消除了使用固定回复队列的大部分原因(一个异常(exception)是如果您需要 HA 回复队列)。

也就是说,问题是您将模板包装在 MessageListenerAdapter 中 - 这是不必要的(并且无论如何都不会工作) - 模板实现了 MessageListener

关于java - Spring AMQP动态创建RabbitTemplate和SimpleMessageListenerContainer,报错RabbitTemplate is not配置为MessageListener,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42972983/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com