gpt4 book ai didi

java - Spring amqp 拒绝监听器外部的消息

转载 作者:行者123 更新时间:2023-11-30 05:54:23 26 4
gpt4 key购买 nike

该应用程序使用java 10、spring amqp和rabbitmq。

系统有一个死信队列,我们​​可以在其中发送一些消息(由于数据库不可用,这些消息无法按预期进行处理)。

目前,每 X 秒检查一次数据库可用性,如果仅可用,我们会将消息重新排队到其原始队列。否则我们什么也不做,消息会保留在死信队列中。

当重新排队到原始队列时,消息可以再次返回到死信队列并看到 x-death header 计数不断增长。

由于某些原因,我们希望处理计数 >= 5(例如)的死信消息,并将其他消息重新排队到死信队列。

我需要首先对消息进行基本确认以检查x-死亡计数 header ,然后如果计数足够大,则将其发送到原始队列,否则在死信队列中重新排队。

我无法设法重新排队到死信队列,因为基本获取不在监听器内部:抛出 AmqpRejectAndDontRequeueException 不起作用,因为异常未在rabbitmq监听器对象内部抛出。

我尝试在 receiveAndCallback 方法中抛出异常,但这似乎并不更好:

rabbitTemplate.receiveAndReply(queueName, new ReceiveAndReplyCallback<Message, Object>() {

@Override
public Object handle(Message message) {
Long messageXdeathCount = null;
if (null != message.getMessageProperties() && null != message.getMessageProperties().getHeaders()) {
List<Map<String, ?>> xdeathHeader =
(List<Map<String, ?>>) message.getMessageProperties().getHeaders().get(
"x-death");
if (null != xdeathHeader && null != xdeathHeader.get(0)) {
messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
}
}
if (messageXdeathCount == null) {
messageXdeathCount = 0L;
}
if (messageXdeathCount >= 5) {
resendsMessage(message);
} else {
// this does not reject the message
throw new AmqpRejectAndDontRequeueException("rejected");
}
return null;
}
});
return receive;
}

执行此方法后,消息没有像我预期的那样被拒绝,并且离开了队列(已被确认)。

这是交换和队列声明:

@Bean
public Exchange exchange() {
TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
admin().declareExchange(exchange);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", EXCHANGE);
Queue queue = new Queue("queueName", true, false, false, args);
admin().declareQueue(queue);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
admin().declareBinding(binding);
return exchange;
}

如何在不使用 AmqpRejectAndDontRequeueException 的情况下拒绝死信队列中的消息?交易所是否可以将 x-dead-letter-exchange 设置为 self?

感谢您的帮助

更新

我尝试了另一种方法,使用 channel 获取和拒绝:

// exchange creation
@Bean
public Exchange exchange() throws IOException {
Connection connection = connectionFactory().createConnection();
Channel channel = channel();
channel.exchangeDeclare(EXCHANGE, ExchangeTypes.TOPIC, true, false, null);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", EXCHANGE);
channel.queueDeclare("queueName", true, false, false, args);
channel.queueBind("queueName", EXCHANGE, routingKey);
return exchange;
}

消息获取并确认或拒绝:

GetResponse response = channel.basicGet(queueName, false);
Long messageXdeathCount = null;
if(null != response.getProps() && null != response.getProps().getHeaders()) {
List<Map<String, ?>> xdeathHeader =
(List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
if(null != xdeathHeader && null != xdeathHeader.get(0)) {
messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
}
}
if (messageXdeathCount == null) {
messageXdeathCount = 0L;
}
if (messageXdeathCount >= 5) {
MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
MessageProperties messageProps =
messagePropertiesConverter.toMessageProperties(response.getProps(),
response.getEnvelope(), "UTF-8");
resendsMessage(new Message(response.getBody(), messageProps));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
} else {
if(response.getProps().getHeaders().get("x-death") == null) {
response.getProps().getHeaders().put("x-death", new ArrayList<>());
}
if(((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0) == null) {
((List<Map<String, Object>>)response.getProps().getHeaders().get("x-death")).add(new HashMap<>());
}
((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0).put(
"count", messageXdeathCount + 1);
channel.basicReject(response.getEnvelope().getDeliveryTag(), true);
}

首先我意识到它很丑陋,然后消息在 get 和拒绝之间无法更新。有没有办法使用channel.basicReject并更新x-death count header?

最佳答案

receiveAndReply() 方法当前不提供对接收消息确认的控制。欢迎打开New Feature Request .

您可以使用监听器容器来获得所需的灵 active 。

编辑

您可以下拉至rabbitmq API...

rabbitTemplate.execute(channel -> {
// basicGet, basicPublish, ack/nack etc here
});

关于java - Spring amqp 拒绝监听器外部的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53429782/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com