gpt4 book ai didi

RocketMQ 延时级别配置方式

转载 作者:qq735679552 更新时间:2022-09-29 22:32:09 38 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章RocketMQ 延时级别配置方式由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等.

其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推.

如何配置:

在服务器端(rocketmq-broker端)的属性配置文件中加入以下行:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 。

描述了各级别与延时时间的对应映射关系.

这个配置项配置了从1级开始各级延时的时间,如1表示延时1s,2表示延时5s,14表示延时10m,可以修改这个指定级别的延时时间; 。

时间单位支持:s、m、h、d,分别表示秒、分、时、天; 。

默认值就是上面声明的,可手工调整; 。

默认值已经够用,不建议调整【仅供参考,还是根据实际需要调整。调整默认值时注意同时要修改时间对应的level级别的值】 。

如何发送延时消息:

发送延时消息只需要在客户端(rocketmq-client端)待发送的消息( com.alibaba.rocketmq.common.message.Message )中设置延时级别delayLevel即可.

?
1
2
3
Message msg = new Message(topicName, "" ,keys,message.getBytes());
msg.setDelayTimeLevel(delayLevel);
SendResult sendResult = getMQProducer.send(msg);

RocketMQ定时(延迟)消息

RocketMQ 不支持任意时间自定义的延迟消息,仅支持内置预设值的延迟时间间隔的延迟消息.

预设值的延迟时间间隔为:

1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h 。

延时消息的使用场景

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存.

生产 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.xin.rocketmq.demo.testrun;
import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class ProducerDelay {
     public static void main(String[] args) throws Exception {
         DefaultMQProducer producer = new DefaultMQProducer( "please_rename_unique_group_name" );
         producer.setNamesrvAddr( "192.168.10.11:9876" );
         producer.start();
         Message msg1 = new Message(
                 JmsConfig.TOPIC,
                 "订单001" .getBytes());
         msg1.setDelayTimeLevel( 2 ); //延迟5秒
         Message msg2 = new Message(
                 JmsConfig.TOPIC,
                 "订单001" .getBytes());
         msg2.setDelayTimeLevel( 4 ); //延迟30秒
         SendResult sendResult1 = producer.send(msg1);
         SendResult sendResult2 = producer.send(msg2);
         System.out.println( "Product1-同步发送-Product信息={}" + sendResult1);
         System.out.println( "Product2-同步发送-Product信息={}" + sendResult2);
         producer.shutdown();
     }
}

消费 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.xin.rocketmq.demo.testrun;
import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ConsumerDelay {
     public static void main(String[] args) throws Exception {
         // 实例化消费者
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "please_rename_unique_group_name" );
         // 设置NameServer的地址
         consumer.setNamesrvAddr( "192.168.10.11:9876" );
         // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
         consumer.subscribe(JmsConfig.TOPIC, "*" );
         // 注册消息监听者
         consumer.registerMessageListener( new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                 for (MessageExt message : messages) {
                     // Print approximate delay time period
                     System.out.println( "Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later" );
                 }
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         });
         // 启动消费者
         consumer.start();
     }
}

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我.

原文链接:https://blog.csdn.net/weixin_38951207/article/details/79022875 。

最后此篇关于RocketMQ 延时级别配置方式的文章就讲到这里了,如果你想了解更多关于RocketMQ 延时级别配置方式的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

38 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com