gpt4 book ai didi

java - 处理死信队列的配置

转载 作者:行者123 更新时间:2023-12-01 19:31:42 25 4
gpt4 key购买 nike

我有一个使用 Spring Cloud Streams - RabbitMQ 在微服务内交换消息的项目。对于我的项目来说至关重要的一件事是我不能丢失任何消息。

为了尽量减少失败,我计划如下:

  • 对队列中的消息使用默认重试方法
  • 配置死信队列以在一段时间后再次将消息放入队列
  • 为了避免无限循环,只允许将消息从死信队列重新发布到常规消息传递队列几次(假设为 5 次)。

我相信我可以使用以下配置来完成前两项:

#dlx/dlq setup - retry dead letter 5 minutes later (300000ms later)
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.republish-to-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=300000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=

#input
spring.cloud.stream.bindings.myInput.destination=my-queue
spring.cloud.stream.bindings.myInput.group=my-group

但是,我找不到searching on this reference guide如何做我想做的事(主要是如何配置从死信队列重新发布的最大数量)。我不完全确定我走在正确的道路上 - 也许我应该手动创建第二个队列并编写我想要的代码,并只为完全失败的消息留下死信(我必须定期检查并手动处理,因为我的系统不应该丢失任何消息)...

我是这些框架的新手,我需要你的帮助来配置我的......

最佳答案

This documentation for the rabbit binder展示了在一定次数的重试失败后如何向某些 parking 场队列发布死信。

@SpringBootApplication
public class ReRouteDlqApplication {

private static final String ORIGINAL_QUEUE = "so8400in.so8400";

private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

private static final String X_RETRIES_HEADER = "x-retries";

public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}

@Autowired
private RabbitTemplate rabbitTemplate;

@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}

@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}

}

第二个示例展示了如何使用延迟交换插件来延迟重试之间的时间。

@SpringBootApplication
public class ReRouteDlqApplication {

private static final String ORIGINAL_QUEUE = "so8400in.so8400";

private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

private static final String X_RETRIES_HEADER = "x-retries";

private static final String DELAY_EXCHANGE = "dlqReRouter";

public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}

@Autowired
private RabbitTemplate rabbitTemplate;

@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
headers.put("x-delay", 5000 * retriesHeader);
this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}

@Bean
public DirectExchange delayExchange() {
DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
exchange.setDelayed(true);
return exchange;
}

@Bean
public Binding bindOriginalToDelay() {
return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
}

@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}

}

关于java - 处理死信队列的配置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59254958/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com