gpt4 book ai didi

java - 在 RabbitMQ 和 Spring 中是否可以选择创建只接受高优先级消息的消费者?

转载 作者:行者123 更新时间:2023-12-02 01:20:55 31 4
gpt4 key购买 nike

我正在创建一个应用程序,该应用程序使用 RabbitMQ 向消费者发送消息以进行耗时的处理。但是,我需要对消息进行优先级排序。当高优先级的消息到达时,即使所有消费者实例都在处理其他消息,也必须处理该消息。

据我所知,在 Spring Boot 和 RabbitMQ 中不可能抢占处理低优先级消息并切换到处理高优先级消息。

是否可以创建只接受高优先级消息的消费者,或者当所有其他都忙且高优先级消息到达时动态运行额外的消费者组?

我尝试使用 x-max-priority=10 标志添加队列并增加消费者数量,但这并不能解决我的问题。

假设我们运行 50 个消费者并发送 50 条低优先级消息。当执行耗时的处理时,新消息以高优先级到达,但无法立即处理,因为所有 50 个消费者都忙。

配置中有一部分设置消费者数量

@Bean
public SimpleRabbitListenerContainerFactory
rabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("rabbitConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setConcurrentConsumers(50);
factory.setMaxConcurrentConsumers(100);
return factory;
}

有没有办法创建一组接受高优先级消息(例如高于 0)的消费者,或者为高优先级消息动态创建消费者?

最佳答案

我不知道如何实现您所描述的先发制人策略,但您可以考虑许多替代方案。

优先级设置

首先要考虑的是 RabbitMQ 本身的优先级支持。

请考虑 Gavin M. Roy 的《RabbitMQ in Depth》中的摘录:

“As of RabbitMQ 3.5.0, the priority field has been implemented as per the AMQP specification. It’s defined as an integer with possible values of 0 through 9 to be used for message prioritization in queues. As specified, if a message with a priority of 9 is published, and subsequently a message with a priority of 0 is published, a newly connected consumer would receive the message with the priority of 0 before the message with a priority of 9”.

例如

rabbitTemplate.convertAndSend("Hello World!", message -> {
MessageProperties properties = MessagePropertiesBuilder.newInstance()
.setPriority(0)
.build();
return MessageBuilder.fromMessage(message)
.andProperties(properties)
.build();
});

基于优先级的交换

第二种选择是定义主题交换并定义考虑您的优先级的路由键。

例如,考虑使用 EventName.Priority 模式的路由键来交换事件,例如OrderPlaced.HighOrderPlaced.NormalOrderPlaced.Low

基于此,您可以有一个队列仅绑定(bind)到高优先级的订单,即 OrderPlaced.High 以及许多专门用于该队列的消费者。

例如

String routingKey = String.format("%s.%s", event.name(), event.priority());
rabbit.convertAndSend(routingKey, event);

使用如下监听器,其中队列 high-priority-orders 绑定(bind)到事件 OrderPlaced 和优先级的 events 交换High 使用路由键 OrderPlaced.High

@Component
@RabbitListener(queues = "high-priority-orders", containerFactory="orders")
public class HighPriorityOrdersListener {

@RabbitHandler
public void onOrderPlaced(OrderPlacedEvent orderPlaced) {
//...
}
}

显然,您需要一个专用的线程池(在上面的orders容器工厂中)来处理高优先级请求。

关于java - 在 RabbitMQ 和 Spring 中是否可以选择创建只接受高优先级消息的消费者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57712856/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com