gpt4 book ai didi

java - Spring AMQP——@RabbitListener 是在幕后轮询吗?

转载 作者:塔克拉玛干 更新时间:2023-11-01 23:10:40 25 4
gpt4 key购买 nike

总结

我想异步处理来自 AMQP/RabbitMQ 队列的消息。我为此实现了一个 @RabbitListener 方法(来自 spring-rabbit),但似乎这个监听器实际上正在轮询我的队列。这是可以预料的吗?我本来希望 RabbitMQ 以某种方式通知监听器,而不必轮询。

如果符合预期,我是否也可以通过 Spring AMQP 以某种方式在不进行轮询的情况下异步使用消息?

我观察到的

当我发送一条消息时,它被听众正确接收。我仍然看到连续的日志消息流,表明监听器继续轮询空队列:


15:41:10.543 [pool-1-thread-3] DEBUG o.s.a.r.l.BlockingQueueConsumer - ConsumeOK : Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:10.544 [main] DEBUG o.s.a.r.c.CachingConnectionFactory - Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.1.1:5672/,2)
15:41:10.545 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,2)
15:41:10.545 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Publishing message on exchange [], routingKey = [myQueue]
Sent: Hello World
15:41:10.559 [pool-1-thread-4] DEBUG o.s.a.r.l.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:10.560 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Received message: (Body:'Hello World'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=myQueue, deliveryTag=1, messageCount=0])
15:41:10.571 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=Hello World, headers={timestamp=1435844470571, id=018f39f6-ebca-aabf-7fe3-a095e959f65d, amqp_receivedRoutingKey=myQueue, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=myQueue, amqp_consumerTag=amq.ctag-bUsK4KQN6_QHzf8DoDC_ww, amqp_contentEncoding=UTF-8, contentType=text/plain, amqp_deliveryTag=1, amqp_redelivered=false}]]
Received: Hello World
15:41:10.579 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:11.579 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:12.583 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0

最后一条日志消息基本上每秒无限重复。

我的测试代码

前两种方法可能是最有趣的部分;剩下的主要是Spring的配置:

@Configuration
@EnableRabbit
public class MyTest {

public static void main(String[] args) throws InterruptedException {
try (ConfigurableApplicationContext appCtxt =
new AnnotationConfigApplicationContext(MyTest.class)) {
// send a test message
RabbitTemplate template = appCtxt.getBean(RabbitTemplate.class);
Queue queue = appCtxt.getBean(Queue.class);
template.convertAndSend(queue.getName(), "Hello World");
System.out.println("Sent: Hello World");

// Now that the application with its message listeners is running,
// block this thread forever; make sure, though, that the
// application context can sanely be closed.
appCtxt.registerShutdownHook();
Object blockingObj = new Object();
synchronized (blockingObj) {
blockingObj.wait();
}
}
}

@RabbitListener(queues = "#{ @myQueue }")
private void processHello(@Payload String msg,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)
throws IOException {
System.out.println("Received: " + msg);
channel.basicAck(deliveryTag, false);
}

@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(rabbitConnFactory());
}

@Bean
public ConnectionFactory rabbitConnFactory() {
return new CachingConnectionFactory();
}

@Bean
public SimpleRabbitListenerContainerFactory
rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory result =
new SimpleRabbitListenerContainerFactory();
result.setConnectionFactory(rabbitConnFactory());
result.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return result;
}

@Bean
public Queue myQueue() {
return new Queue("myQueue", false);
}

@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(rabbitConnFactory());
}
}

最佳答案

不是轮询rabbitmq;当消息从 rabbit 异步到达时,它被放置在消费者的内部队列中;将消息交给阻塞的监听线程,等待消息的到来。

您看到的 DEBUG 消息是在监听器线程超时等待来自 rabbitmq 的新消息到达之后出现的。

您可以增加 receiveTimeout 以减少日志,或者简单地禁用 BlockingQueueConsumer 的 DEBUG 日志记录。

增加超时将使容器对容器 stop() 请求的响应变慢。

编辑:

回应您在下方的评论...

是的,我们可以中断线程,但它比这更复杂。当 txSize > 1 时,接收超时也用于确认消息。

假设您只想确认每 20 条消息(而不是每条消息)。人们这样做是为了提高高容量环境中的性能。超时也用于ack(txSize实际上是每n条消息或超时)。

现在,假设有 19 条消息到达,然后在 60 秒内没有一条消息到达,您的超时时间为 30 秒。

这意味着这 19 条消息在很长一段时间内都未被确认。默认配置下,第19条消息到达后1秒发送ack。

这个超时确实没有什么开销(我们简单地循环并再次等待),所以增加它是不寻常的。

此外,当容器在上下文关闭时停止时,人们一直在停止和启动容器。

关于java - Spring AMQP——@RabbitListener 是在幕后轮询吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31186583/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com