gpt4 book ai didi

java - 如何设置 spring-rabbitmq 监听器的接收消息速率

转载 作者:行者123 更新时间:2023-12-04 12:04:41 25 4
gpt4 key购买 nike

我正在使用 spring-rabbitmq ,并且可以成功获取消息。
但是当我调试时,我发现监听器创建了一个线程,它会问
每 1 秒发送一次消息。我认为速率太高了。我想要做的是将速率设置为 1 分钟或任何其他值。
我搜索了很多但不起作用

我的springrabbit.xml:

<rabbit:listener-container connection-factory="connectionFactory"  message-converter="jsonMessageConverter" >
<rabbit:listener queues="notification" ref="messageReceiver"/>
</rabbit:listener-container>

我的Java代码:
@Override
public void onMessage(Message message) { System.out.println("messagebody: "+new String(message.getBody()));
LOGGER.info(dateFormatUtil.getDateFormat(new Date())+new String(message.getBody()));
boolean result=false;
SendSingleEmailService sendSingleEmailService = new SendSingleEmailService();
try {
result =sendSingleEmailService.send(new String(message.getBody()));
} catch (FileNotFoundException e) {
LOGGER.error(dateFormatUtil.getDateFormat(new Date())+"[NOTIFICATION] [ERROR] message is null!");
e.printStackTrace();
}
if(!result) {
try{
throw new Exception();
}catch (FileNotFoundException e) {
throw new RuntimeException(e);
}catch (Exception e) {
throw new RuntimeException(e);
}finally {
LOGGER.error(dateFormatUtil.getDateFormat(new Date())+"[NOTIFICATION] [ERROR] Send Email failed!");
}
}


}

部分调试结果如下:
[2017-08-16 18:23:08,595]DEBUG  4286[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@192.168.203.151:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:09,600]DEBUG 5291[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@192.168.203.151:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:10,602]DEBUG 6293[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@192.168.203.151:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:11,603]DEBUG 7294[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@192.168.203.151:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:12,609]DEBUG 8300[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@192.168.203.151:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:13,612]DEBUG 9303[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@192.168.203.151:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:14,615]DEBUG 10306[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@192.168.203.151:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:15,617]DEBUG 11308[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@192.168.203.151:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:16,618]DEBUG 12309[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@192.168.203.151:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:17,619]DEBUG 13310[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@192.168.203.151:5672/,1), acknowledgeMode=AUTO local queue size=0

最佳答案

您可以增加receiveTimeout - 见 Message Listener Container Configuration .

但是,容器对 stop() 的响应会降低。要求。

我认为您过度关注轮询率 - 轮询用于暂存传递消息的内部队列的开销很小。

如果它只是您想要删除的日志“噪音”(调试时),请设置 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer将类别记录到 INFOWARN .

即将发布的 2.0 版本有一个新的 DirectMessageListenerContainer它不会轮询内部队列并消除此问题。 Info here .

编辑

listener still ask rabbitmq for messages every 1s



如果您仍然每 1 秒看到一次调试消息,则说明您尚未配置 receiveTimeout适本地;它不是“询问rabbitmq”消息,线程在等待 receiveTimeout 后唤醒(并发现兔子没有发送新消息)所以它可以对 stop() 使用react;然后它再次 hibernate ,直到有新消息到达或再次超时。如果没有消息可用,则不会与代理交互 - 消息由代理推送。

也许您误解了监听器容器的用途。它适用于消息驱动的应用程序——你不能“减慢”消息到达的速度——它们是由代理推送的。

如果您希望每分钟只接收一次消息,您应该使用 RabbitTemplate receive() (或 receiveAndConvert() )方法而不是消息监听器容器。

关于java - 如何设置 spring-rabbitmq 监听器的接收消息速率,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45711057/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com