gpt4 book ai didi

spring-boot - SpringBoot + Rabbitmq - DLQ 队列不工作

转载 作者:行者123 更新时间:2023-12-02 19:16:38 26 4
gpt4 key购买 nike

我已经设置了 dlq 和 dlx,但是失败的消息没有重定向到 dlq。我正在尝试从 java 应用程序以及从 rabbitmq 服务器向 MESSAGES.EXCHANGE 发送消息,在这两种情况下我都收到消息但是在抛出异常消息后应该重定向到 DLX.MESSAGES.EXCHANGE 但它正在发生。

下面是 rabbitmq serer 的 java 代码和屏幕截图。一切对我来说都是正确的。在代码或 rabbitmq 服务器中找不到任何问题。

队列设置代码-

public class DLQAmqpConfiguration {
public static final String DLX_MESSAGES_EXCHANGE = "DLX.MESSAGES.EXCHANGE";
public static final String DLQ_MESSAGES_QUEUE = "DLQ.MESSAGES.QUEUE";

public static final String MESSAGES_QUEUE = "MESSAGES.QUEUE";
public static final String MESSAGES_EXCHANGE = "MESSAGES.EXCHANGE";
public static final String ROUTING_KEY_MESSAGES_QUEUE = "ROUTING_KEY_MESSAGES_QUEUE";

@Bean
Queue messagesQueue() {
return QueueBuilder.durable(MESSAGES_QUEUE)
.withArgument("x-dead-letter-exchange", DLX_MESSAGES_EXCHANGE)
.build();
}

@Bean
DirectExchange messagesExchange() {
return new DirectExchange(MESSAGES_EXCHANGE);
}

@Bean
Binding bindingMessages() {
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(ROUTING_KEY_MESSAGES_QUEUE);
}

@Bean
FanoutExchange deadLetterExchange() {
return new FanoutExchange(DLX_MESSAGES_EXCHANGE);
}

@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(DLQ_MESSAGES_QUEUE).build();
}

@Bean
Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}
}

制作人-

    this.template.convertAndSend(DLQAmqpConfiguration.MESSAGES_EXCHANGE,
DLQAmqpConfiguration.ROUTING_KEY_MESSAGES_QUEUE, message);

消费者 -

    @RabbitListener(queues = DLQAmqpConfiguration.MESSAGES_QUEUE)
public void receiveMessage(Message message) throws BusinessException {
System.out.println("Received failed message, re-queueing: " + message.toString());
System.out.println("Received failed message, re-queueing: " + message.getMessageProperties().getReceivedRoutingKey());
throw new BusinessException();
}

// this code never running
@RabbitListener(queues = DLQAmqpConfiguration.DLQ_MESSAGES_QUEUE)
public void processFailedMessages(Message message) {
System.out.println("Received failed message: " + message.toString());
}

交换 - enter image description here

队列 -

enter image description here

enter image description here

日志 -

Received failed message, re-queueing: (Body:'[B@55c36bc9(byte[26])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=PERSISTENT, redelivered=true, receivedExchange=MESSAGES.EXCHANGE, receivedRoutingKey=ROUTING_KEY_MESSAGES_QUEUE, deliveryTag=5444, consumerTag=amq.ctag-KrxkDPlc_uoqHOx_bbnvnA, consumerQueue=MESSAGES.QUEUE])
Received failed message, re-queueing: ROUTING_KEY_MESSAGES_QUEUE
2020-08-27 21:36:33.460 WARN 13192 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.example.rabbitmq.RabbitmqApplication.receiveMessage(org.springframework.amqp.core.Message) throws com.example.rabbitmq.errorhandler.BusinessException' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:228) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:148) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:133) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:970) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:916) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1291) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1197) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: com.example.rabbitmq.errorhandler.BusinessException: null

最佳答案

您必须设置 spring.rabbitmq.listener.simple.default-requeue-rejected=false(或者 ...direct... 如果使用直接容器而不是简单的容器)或抛出 AmqpRejectAndDontRequeueException

否则,失败的消息将被重新排队并重新投递。

@SpringBootApplication
public class So63620066Application {

public static void main(String[] args) {
SpringApplication.run(So63620066Application.class, args);
}

public static final String DLX_MESSAGES_EXCHANGE = "DLX.MESSAGES.EXCHANGE";

public static final String DLQ_MESSAGES_QUEUE = "DLQ.MESSAGES.QUEUE";

public static final String MESSAGES_QUEUE = "MESSAGES.QUEUE";

public static final String MESSAGES_EXCHANGE = "MESSAGES.EXCHANGE";

public static final String ROUTING_KEY_MESSAGES_QUEUE = "ROUTING_KEY_MESSAGES_QUEUE";

@Bean
Queue messagesQueue() {
return QueueBuilder.durable(MESSAGES_QUEUE)
.withArgument("x-dead-letter-exchange", DLX_MESSAGES_EXCHANGE)
.build();
}

@Bean
DirectExchange messagesExchange() {
return new DirectExchange(MESSAGES_EXCHANGE);
}

@Bean
Binding bindingMessages() {
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(ROUTING_KEY_MESSAGES_QUEUE);
}

@Bean
FanoutExchange deadLetterExchange() {
return new FanoutExchange(DLX_MESSAGES_EXCHANGE);
}

@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(DLQ_MESSAGES_QUEUE).build();
}

@Bean
Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}

@RabbitListener(queues = MESSAGES_QUEUE)
public void receiveMessage(Message message) {
System.out.println("Received failed message, re-queueing: " + message.toString());
System.out.println(
"Received failed message, re-queueing: " + message.getMessageProperties().getReceivedRoutingKey());
throw new RuntimeException("fail");
}

@RabbitListener(queues = DLQ_MESSAGES_QUEUE)
public void processFailedMessages(Message message) {
System.out.println("Received failed message: " + message.toString());
}

@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend(MESSAGES_EXCHANGE,
ROUTING_KEY_MESSAGES_QUEUE, "foo");
};
}

}
spring.rabbitmq.listener.simple.default-requeue-rejected=false
Received failed message, re-queueing: ROUTING_KEY_MESSAGES_QUEUE
2020-08-27 12:49:41.056 WARN 11489 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.example.demo.So63620066Application.receiveMessage(org.springframework.amqp.core.Message)' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:228) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:148) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:133) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:970) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:916) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1291) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1197) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
Caused by: java.lang.RuntimeException: fail
at com.example.demo.So63620066Application.receiveMessage(So63620066Application.java:71) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_212]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_212]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_212]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_212]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:53) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:220) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
... 13 common frames omitted

Received failed message: (Body:'foo' MessageProperties [headers={x-first-death-exchange=MESSAGES.EXCHANGE, x-death=[{reason=rejected, count=1, exchange=MESSAGES.EXCHANGE, time=Thu Aug 27 12:49:41 EDT 2020, routing-keys=[ROUTING_KEY_MESSAGES_QUEUE], queue=MESSAGES.QUEUE}], x-first-death-reason=rejected, x-first-death-queue=MESSAGES.QUEUE}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=DLX.MESSAGES.EXCHANGE, receivedRoutingKey=ROUTING_KEY_MESSAGES_QUEUE, deliveryTag=3, consumerTag=amq.ctag--VIXT0V3hhBrlfTFqI5uxg, consumerQueue=DLQ.MESSAGES.QUEUE])

关于spring-boot - SpringBoot + Rabbitmq - DLQ 队列不工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63620066/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com