- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
大家好,我是Leo!今天来和大家分享RocketMQ的一些用法.
Producer: 用于生产消息的运行实体.
Topic: 主题,用于消息传输和存储的分组容器.
MessageQueue: 消息传输和存储的实际单元容器.
Message: 消息传输的最小单元.
ConsumerGroup: 消费者组.
Consumer: 消费者.
Subscription: 订阅关系,发布订阅模式中消息过滤、重试、消费进度的规则配置.
MQ的明显优势有3个.
应用解耦: 以多服务为例,用户下单,需要通知订单服务和库存服务,我们可以通过MQ消息来解除下单和库存系统的耦合.
异步提速: 以秒杀为例,我们可以先返回秒杀结果,后续再通过MQ异步消息去插入记录和扣减库存等,减少调用的链路长度.
削峰填谷: 将某一时间内的请求量分摊到更多时间处理,比如系统A一秒只能处理10000个请求,但是我有100000个请求需要处理,我可以将请求发到MQ中,再分成10秒去消费这些请求.
当然MQ也有 劣势 , 系统可用性降低 , 系统复杂度提高 , 一致性问题 .
主要包括Producer、Broker、Consumer、NameServer Cluster.
可以通过设置不同的消费者组 。
不同组通过 不同的消费者组 既可以实现同时收到一样数量的消息, 那同一个消费者组需要怎样才能收到同样数量的消息呢?
// 消费者消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);
默认是集群模式CLUSTERING,设置成广播模式 。
既可以实现一对多的发送.
同步消息需要阻塞等待消息发送结果的返回 。
public class ProducerDemo {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message message = new Message();
message.setTopic("MQLearn");
message.setTags("1.0.0");
message.setBody("Hello MQ!".getBytes(StandardCharsets.UTF_8));
SendResult result = producer.send(message);
if (result.getSendStatus().equals(SendStatus.SEND_OK)) {
System.out.println(result);
System.out.println("发送成功:" + message);
}
producer.shutdown();
}
}
异步消息需要实现发送成功和失败的回调函数.
public class Producer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.246.140:9876");
producer.start();
// 异步消息
for (int i = 0; i < 10; i++) {
Message message = new Message();
message.setTopic("topic7");
message.setTags("1.0.0");
message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 发送成功的回调方法
System.out.println(sendResult);
}
@Override
public void onException(Throwable e) {
// 发送失败的回调方法
System.out.println(e);
}
});
}
TimeUnit.SECONDS.sleep(10);
System.out.println("异步发送完成!");
}
}
单向消息就类似UDP,只顾单向发送,不管是否发送成功, 常用于日志收集等场景.
public class SingleDirectionProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 单向消息
for (int i = 0; i < 10; i++) {
Message message = new Message();
message.setTopic("topic8");
message.setTags("1.0.0");
message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
producer.sendOneway(message);
}
System.out.println("带向发送完成!");
}
}
RocketMQ提供的定时消息并不能指定在什么时间点去投递消息。而是根据设定的等待时间,起到延时到达的缓冲作用在RocketMQ中,延时消息的delayTimeLevel支持以下级别
1 1s 2 5s 3 10s 4 30s 5 1m 6 2m 7 3m 8 4m 9 5m 10 6m 11 7m 12 8m 13 9m 14 10m 15 20m 16 30m 17 1h 18 2h 。
// 设置消息延时级别
message.setDelayTimeLevel(3);
批量消息支持一次发送多条消息.
注意:
批量消息需要有相同的topic 。
不能是延时消息 。
消息内容不能超过4M,可以通过producer.setMaxMessageSize()和broker进行设置设置(可以通过拆分多次发送) 。
public class BatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
List<Message> list = new ArrayList<>();
// 批量消息
for (int i = 0; i < 10; i++) {
Message message = new Message();
message.setTopic("topic10");
message.setTags("1.0.0");
message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
list.add(message);
}
SendResult result = producer.send(list);
System.out.println(result);
TimeUnit.SECONDS.sleep(2);
System.out.println("发送完成!");
}
}
顺序消息支持按照消息的发送消息先后获取消息.
比如:我的一笔订单有多个流程需要处理,比如创建->付款->推送->完成.
通过同一笔订单放到一个队列中,这样就可以解决消费的无序问题.
通过实现MessageQueueSelector来选择一个队列.
public class Producer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message message = new Message();
// 模拟业务ID
int step = 10;
message.setTopic("topic12");
message.setTags("1.0.0");
message.setBody(("Hello World !").getBytes(StandardCharsets.UTF_8));
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 队列数
int size = mqs.size();
// 取模
int orderId = step;
return mqs.get(orderId % size);
}
}, null);
System.out.println("发送完成!");
}
}
public class Consumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setConsumerGroup("group1");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic12", "*");
// 消费者,起一个顺序监听,一个线程,只监听一个队列
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg);
byte[] body = msg.getBody();
System.out.println(new String(body));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动了!");
}
}
RocketMQ中的事务消息支持在分布式场景下消息生产和本地事务的最终一致性.
大致流程为, 。
生产者先将消息发送至RocketMQ.
RocketMQBroker将消息持久化成功后,向生产者返回ACK消息确认已经返回成功,消息状态为暂时不能投递状态.
执行本地事务逻辑.
生产者根据事务执行结果向Broker提交commit或者rollback结果.
如果在断网或者重启情况下,未收到4的结果,或者返回Unknown未知状态,在固定时间对消息进行回查.
生产者收到消息回查后,需要本地事务执行的最终结果.
生产者对本地事务状态进行二次提交或确认.
public class Producer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
// 设置事务监听
producer.setTransactionListener(new TransactionListener() {
// 正常事务监听
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 把消息保存到mysql数据库
boolean ok = false;
if (ok) {
System.out.println("正常执行事务过程");
return LocalTransactionState.COMMIT_MESSAGE;
} else {
System.out.println("事务补偿过程");
return LocalTransactionState.UNKNOW;
//return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 事务补偿事务
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("事务补偿过程");
// sql select
if (true) {
} else {
}
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
String msg = "Hello Transaction Message!";
Message message = new Message("topic13", "tag", msg.getBytes(StandardCharsets.UTF_8));
TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
TimeUnit.SECONDS.sleep(2);
System.out.println(transactionSendResult);
System.out.println("发送完成!");
}
}
在RocketMQ中的消息过滤功能能通过生产者和消费者对消息的属性和Tag进行定义,在消费端可以根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费.
支持两种方式:Tag标签过滤和SQL属性过滤.
Message message = new Message();
message.setTopic("topic11");
message.setTags("tag");
message.setBody(("Hello World !" + "tag").getBytes(StandardCharsets.UTF_8));
message.putUserProperty("name", "zhangsan");
message.putUserProperty("age", "16");
SendResult result = producer.send(message);
subscribe方法subExpression参数也支持Tag过滤 。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setConsumerGroup("group1");
consumer.setNamesrvAddr("112.74.125.184:9876");
consumer.subscribe("topic11", MessageSelector.bySql("age > 16"));
在SpringBoot项目中主要通过 RocketMQTemplate 进行消息的发送.
// 普通消息
rocketMQTemplate.convertAndSend("topic10", user);
rocketMQTemplate.send("topic10", MessageBuilder.withPayload(user).
SendResult result = rocketMQTemplate.syncSend("topic10", user);
// 异步消息
rocketMQTemplate.asyncSend("topic10", user, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("成功!");
}
@Override
public void onException(Throwable e) {
System.out.println(e);
}
}, 1000L);
// 单向消息
rocketMQTemplate.sendOneWay("topic10", user);
// 延时消息
rocketMQTemplate.syncSend("topic10", MessageBuilder.withPayload(user).build(), 2000L, 3);
// 批量消息
rocketMQTemplate.syncSend("topic10", list, 1000);
消费者:在注解中可以实现根据Tag和SQL进行属性的过滤.
@Service
//@RocketMQMessageListener(
// consumerGroup = "group1",
// topic = "topic10",
// selectorExpression = "tag1 || tag2"
//)
@RocketMQMessageListener(
consumerGroup = "group1",
topic = "topic10",
selectorType = SelectorType.SQL92,
selectorExpression = "age > 16",
messageModel = MessageModel.BROADCASTING
)
public class UserConsumer implements RocketMQListener<User> {
@Override
public void onMessage(User message) {
}
}
今天主要分享了一下RocketMQ的一些基础使用,包括各种类型的消息的使用,偏向于代码实现部分,对于原理篇没有过多涉及.
最后此篇关于RocketMQ的简单使用的文章就讲到这里了,如果你想了解更多关于RocketMQ的简单使用的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
什么是RocketMQ RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。主要功能是异步解耦和流量削峰:。 常见的MQ主要有
1、说说你们公司线上生产环境用的是什么消息中间件? 见【2、多个mq如何选型?】 2、多个mq如何选型? MQ 描述 Rabb
消费者从Broker拉取到消息之后,会将消息提交到线程池中进行消费,RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求 ConsumeReques
全局有序 在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一
RocketMQ支持集群部署来保证高可用。它基于主从模式,将节点分为Master、Slave两个角色,集群中可以有多个Master节点,一个Master节点可以有多个Slave节点。Master节点
RocketMQ 4.5版本之前,可以采用主从架构进行集群部署,但是如果master节点挂掉,不能自动在集群中选举出新的Master节点,需要人工介入,在4.5版本之后提供了DLedger模式,使用
RocketMQ是通过 DefaultMQProducer 进行消息发送的,它实现了 MQProducer 接口, MQProducer 接口中定义了消息发送的方法,方法主要分为三大类:
当Broker收到生产者的消息发送请求时,会对请求进行处理,从请求中解析发送的消息数据,接下来以单个消息的接收为例,看一下消息的接收过程。 数据校验 封装消息 首先Broker会创
在上一讲中,介绍了消息的存储,生产者向Broker发送消息之后,数据会写入到CommitLog中,这一讲,就来看一下消费者是如何从Broker拉取消息的。 RocketMQ消息的消费以组为单位
NameServer是一个注册中心,提供服务注册和服务发现的功能。NameServer可以集群部署,集群中每个节点都是对等的关系(没有像ZooKeeper那样在集群中选举出一个Master节点),节
RocketMQ 4.5版本之前,可以采用主从架构进行集群部署,但是如果master节点挂掉,不能自动在集群中选举出新的Master节点,需要人工介入,在4.5版本之后提供了DLedger模式,使用
消息存储 在 【RocketMQ】消息的存储 一文中提到,Broker收到消息后会调用 CommitLog 的asyncPutMessage方法写入消息,在DLedger模式下使用的是
RocketMQ在集群模式下,同一个消费组内,一个消息队列同一时间只能分配给组内的某一个消费者,也就是一条消息只能被组内的一个消费者进行消费,为了合理的对消息队列进行分配,于是就有了负载均衡.
RocketMQ有两种获取消息的方式,分别为推模式和拉模式。 推模式 推模式在 【RocketMQ】消息的拉取 一文中已经讲过,虽然从名字上看起来是消息到达Broker后推送给消费者
主从同步的实现逻辑主要在 HAService 中,在 DefaultMessageStore 的构造函数中,对 HAService 进行了实例化,并在start方法中,启动了 HAService :
在 【RocketMQ】消息的拉取 一文中可知,消费者在启动的时候,会创建消息拉取API对象 PullAPIWrapper ,调用pullKernelImpl方法向Broker发送拉取消息的
全局有序 在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一
概述 RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息; 预设值的延迟时间间隔为: 1s、 5s、 10s、 30s、 1m
RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等。 其中,level=0 级表示不延时,level=1 表示 1 级延时,le
我尝试给 rockerMQ broker 加注星标,但我收到了错误消息: There is insufficient memory for the Java Runtime Environment t
我是一名优秀的程序员,十分优秀!