gpt4 book ai didi

java - Spring AMQP - 使用带 TTL 的死信机制进行消息重新排队

转载 作者:行者123 更新时间:2023-11-30 02:42:24 25 4
gpt4 key购买 nike

这就像“休斯顿,我们在这里遇到问题”,在第一次尝试处理事件失败后,我需要安排/延迟消息 5 分钟。我在这种情况下实现了死信交换。

失败时的消息将路由至 DLX --> 重试队列,并在 TTL 为 5 分钟后返回工作队列以进行另一次尝试。

这是我正在使用的配置:

public class RabbitMQConfig {
@Bean(name = "work")
@Primary
Queue workQueue() {
return new Queue(WORK_QUEUE, true, false, false, null);
}

@Bean(name = "workExchange")
@Primary
TopicExchange workExchange() {
return new TopicExchange(WORK_EXCHANGE, true, false);
}

@Bean
Binding workBinding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(workQueue()).to(workExchange()).with("#");
}

@Bean(name = "retryExchange")
FanoutExchange retryExchange() {
return new FanoutExchange(RETRY_EXCHANGE, true, false);
}

@Bean(name = "retry")
Queue retryQueue() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", WORK_EXCHANGE);
args.put("x-message-ttl", RETRY_DELAY); //delay of 5 min
return new Queue(RETRY_QUEUE, true, false, false, args);
}

@Bean
Binding retryBinding(Queue queue,FanoutExchange exchange) {
return BindingBuilder.bind(retryQueue()).to(retryExchange());
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}

@Bean
Consumer receiver() {
return new Consumer();
}

@Bean
MessageListenerAdapter listenerAdapter(Consumer receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}

生产者.java:

@GetMapping(path = "/hello")
public String sayHello() {
// Producer operation

String messages[];
messages = new String[] {" hello "};

for (int i = 0; i < 5; i++) {
String message = util.getMessage(messages)+i;

rabbitTemplate.convertAndSend("WorkExchange","", message);
System.out.println(" Sent '" + message + "'");
}
return "hello";
}

消费者.java:

public class Consumer {

@RabbitListener(queues = "WorkQueue")
public void receiveMessage(String message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws IOException, InterruptedException {

try {

System.out.println("message to be processed: " + message);
doWorkTwo(message);
channel.basicAck(tag, false);

} catch (Exception e) {
System.out.println("In the exception catch block");
System.out.println("message in dead letter exchange: " + message);
channel.basicPublish("RetryExchange", "", null, message.getBytes());

}

}

private void doWorkTwo(String task) throws InterruptedException {

int c = 0;
int b = 5;
int d = b / c;

}

}

对于我的场景使用死信交换是否是正确的方法,在重试队列中等待 5 分钟后,第二次尝试时它不会在重试队列中等待 5 分钟(我提到 TTL 为 5 分钟)并立即移至工作队列

我通过点击 localhost:8080/hello url 来运行此应用程序。

这是我更新的配置。

RabbitMQConfig.java:

@EnableRabbit
public class RabbitMQConfig {

final static String WORK_QUEUE = "WorkQueue";
final static String RETRY_QUEUE = "RetryQueue";
final static String WORK_EXCHANGE = "WorkExchange"; // Dead Letter Exchange
final static String RETRY_EXCHANGE = "RetryExchange";
final static int RETRY_DELAY = 60000; // in ms (1 min)

@Bean(name = "work")
@Primary
Queue workQueue() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", RETRY_EXCHANGE);
return new Queue(WORK_QUEUE, true, false, false, args);
}

@Bean(name = "workExchange")
@Primary
DirectExchange workExchange() {
return new DirectExchange(WORK_EXCHANGE, true, false);
}

@Bean
Binding workBinding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(workQueue()).to(workExchange()).with("");
}

@Bean(name = "retryExchange")
DirectExchange retryExchange() {
return new DirectExchange(RETRY_EXCHANGE, true, false);
}

// Messages will drop off RetryQueue into WorkExchange for re-processing
// All messages in queue will expire at same rate
@Bean(name = "retry")
Queue retryQueue() {
Map<String, Object> args = new HashMap<String, Object>();
//args.put("x-dead-letter-exchange", WORK_EXCHANGE);
//args.put("x-message-ttl", RETRY_DELAY);
return new Queue(RETRY_QUEUE, true, false, false, null);
}

@Bean
Binding retryBinding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(retryQueue()).to(retryExchange()).with("");
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDefaultRequeueRejected(false);
/*factory.setAdviceChain(new Advice[] {
org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
.stateless()
.maxAttempts(2).recoverer(new RejectAndDontRequeueRecoverer())
.backOffOptions(1000, 2, 5000)
.build()
});*/
return factory;
}

@Bean
Consumer receiver() {
return new Consumer();
}

@Bean
MessageListenerAdapter listenerAdapter(Consumer receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}

消费者.java:

public class Consumer {

@RabbitListener(queues = "WorkQueue")
public void receiveMessage(String message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long tag,
@Header(required = false, name = "x-death") HashMap<String, String> xDeath)
throws IOException, InterruptedException {

doWorkTwo(message);
channel.basicAck(tag, false);
}

private void doWorkTwo(String task) {
int c = 0;
int b = 5;
if (c < b) {
throw new AmqpRejectAndDontRequeueException(task);
}
}
}

最佳答案

如果您拒绝消息,因此代理将其路由到 DLQ,您可以检查 x-death header 。在这种情况下,我有一个 TTL 为 5 秒的 DLQ,并且来自主队列的消息的使用者拒绝了它;代理将其路由到 DLQ,然后过期并路由回主队列 - x-death header 显示重新路由操作的数量:

x-death header

关于java - Spring AMQP - 使用带 TTL 的死信机制进行消息重新排队,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41258684/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com