gpt4 book ai didi

java - 处理死信队列消息代理独立的方式

转载 作者:行者123 更新时间:2023-12-02 09:12:15 25 4
gpt4 key购买 nike

我有一个项目,目前在下面使用 Spring Cloud Streams 和 RabbitMQ。我已经实现了一个逻辑 based on the documentation 。见下文:

@Component
public class ReRouteDlq {

private static final String ORIGINAL_QUEUE = "so8400in.so8400";

private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

private static final String X_RETRIES_HEADER = "x-retries";

private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;

private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;

@Autowired
private RabbitTemplate rabbitTemplate;

@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}

@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}

}

它达到了预期的效果,但是,它绑定(bind)到 RabbitMQ,并且我的公司计划在一两年内停止使用这个消息代理(不知道为什么,一定是一些疯狂的业务)。因此,我想实现相同的功能,但将其与任何消息代理分离。

我尝试以这种方式更改rePublish方法,但它不起作用:

    @StreamListener(Sync.DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}

它失败了,因为 Message 类具有不可变的 header - 在 put 尝试中抛出异常,表示您无法更改其值(使用 org.springframework. messages.Message 类)。

有没有办法以独立于消息代理的方式实现这个死信队列处理程序?

最佳答案

使用

MessageBuilder.fromMessage(message)
.setHeader("foo", "bar")
...
.build();

请注意 @StreamListener 中的消息是一个 spring-messaging Message<?> ,不是 spring-amqp Message并且无法使用这种方式发送;您需要一个输出绑定(bind)来将消息发送到。

关于java - 处理死信队列消息代理独立的方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59309419/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com