gpt4 book ai didi

RabbitMQ保姆级教程最佳实践

转载 作者:我是一只小鸟 更新时间:2023-09-23 23:02:19 37 4
gpt4 key购买 nike

1、消息队列介绍

1、消息队列概念

1、MQ全称为Message Queue,消息队列(MQ)是⼀种应⽤程序对应⽤程序的通信⽅法。
应⽤程序通过读写出⼊队列的消息(针对应⽤程序的数据)来通信,⽽⽆需专⽤连接来
链接它们。
2、消息传递指的是程序之间通过在消息中发送数据进⾏通信,⽽不是通过直接调⽤彼此来
通信,直接调⽤通常是⽤于诸如远程过程调⽤的技术。

2、常⽤的消息队列产品

1、RabbitMQ 稳定可靠,数据⼀致,⽀持多协议,有消息确认,基于erlang语⾔
2、Kafka ⾼吞吐,⾼性能,快速持久化,⽆消息确认,⽆消息遗漏,可能会有有重复消息,依赖于zookeeper,成本⾼.
3、ActiveMQ 不够灵活轻巧,对队列较多情况⽀持不好.
4、RocketMQ 性能好,⾼吞吐,⾼可⽤性,⽀持⼤规模分布式,协议⽀持单⼀

⼆、RabbitMQ

1、RabbitMQ介绍

1、RabbitMQ是⼀个在AMQP基础上完成的,可复⽤的企业消息系统。他遵循MozillaPublic License开源协议。
2、AMQP,即Advanced Message Queuing Protocol, ⼀个提供统⼀消息服务的应⽤层标准
     ⾼级消息队列协议,是应⽤层协议的⼀个开放标准,为⾯向消息的中间件设计。基于此协议
     的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语⾔等
     条件的限制。Erlang中的实现有 RabbitMQ等。
3、主要特性:
  • 保证可靠性 :使⽤⼀些机制来保证可靠性,如持久化、传输确认、发布确认
  • 灵活的路由功能
  • 可伸缩性:⽀持消息集群,多台RabbitMQ服务器可以组成⼀个集群
  • ⾼可⽤性 :RabbitMQ集群中的某个节点出现问题时队列仍然可⽤
  • ⽀持多种协议
  • ⽀持多语⾔客户端
  • 提供良好的管理界⾯
  • 提供跟踪机制:如果消息出现异常,可以通过跟踪机制分析异常原因
  • 提供插件机制:可通过插件进⾏多⽅⾯扩展

2、RabbitMQ安装和配置

具体参考: https://note.youdao.com/s/MKn2Jr8c 。

3、RabbitMQ逻辑结构

3、RabbitMQ⽤户管理

RabbitMQ默认提供了⼀个guests账号,但是此账号不能⽤作远程登录,也就是不能在管理系统的登录;我们可以创建⼀个新的账号并授予响应的管理权限来实现远程登录

1、逻辑结构

⽤户
虚拟主机
队列

2、⽤户管理

2.1、命令⾏⽤户管理

1、在linux中使⽤命令⾏创建⽤户 。

                            
                              ## 进⼊到rabbit_mq的sbin⽬录
cd 
                            
                            /usr/local/rabbitmq_server-
                            
                              3.7
                            
                            .
                            
                              0
                            
                            /
                            
                              sbin
## 新增⽤户
.
                            
                            /rabbitmqctl add_user ytao admin123
                          

2、设置⽤户级别 。

                            
                              ## ⽤户级别:
## 
                            
                            
                              1
                            
                            
                              .administrator 可以登录控制台、查看所有信息、可以对RabbitMQ进⾏管理
## 
                            
                            
                              2
                            
                            
                              .monitoring 监控者 登录控制台、查看所有信息
## 
                            
                            
                              3
                            
                            
                              .policymaker 策略制定者 登录控制台、指定策略
## 
                            
                            
                              4
                            
                            
                              .managment 普通管理员 登录控制台
.
                            
                            /rabbitmqctl set_user_tags ytao administrator
                          

2.2、管理系统进⾏⽤户管理

管理系统登录:访问http://localhost:15672/

4、RabbitMQ⼯作⽅式

RabbitMQ提供了多种消息的通信⽅式—⼯作模式   https://www.rabbitmq.com/getstarted.html
消息通信是由两个⻆⾊完成:消息⽣产者(producer)和 消息消费者(Consumer)

1、简单模式

⼀个队列只有⼀个消费者

2、⼯作模式

多个消费者监听同⼀个队列

3、订阅模式

⼀个交换机绑定多个消息队列,每个消息队列有⼀个消费者监听

4、路由模式

⼀个交换机绑定多个消息队列,每个消息队列都由⾃⼰唯⼀的key,每个消息队列有⼀个消费者监听

5、RabbitMQ交换机和队列管理

1、创建队列

2、创建交换机

3、交换机绑定队列

6、在普通的Maven应⽤中使⽤MQ

1、简单模式

1.1、消息⽣产者

1、创建Maven项⽬ 。

2、添加RabbitMQ连接所需要的依赖 。

                            <!-- https:
                            
                              //
                            
                            
                              mvnrepository.com/artifact/com.rabbitmq/amqp-client --
                            
                            >
<dependency>
 <groupId>com.rabbitmq</groupId>
 <artifactId>amqp-client</artifactId>
 <version>
                            
                              4.10
                            
                            .
                            
                              0
                            
                            </version>
</dependency>
<!-- https:
                            
                              //
                            
                            
                              mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
                            
                            
<dependency>
 <groupId>org.slf4j</groupId>
 <artifactId>slf4j-log4j12</artifactId>
 <version>
                            
                              1.7
                            
                            .
                            
                              25
                            
                            </version>
 <scope>test</scope>
</dependency>
<!-- https:
                            
                              //
                            
                            
                              mvnrepository.com/artifact/org.apache.commons/commonslang3 -->
                            
                            
<dependency>
 <groupId>org.apache.commons</groupId>
 <artifactId>commons-lang3</artifactId>
 <version>
                            
                              3.9
                            
                            </version>
</dependency>
                          

3、在resources⽬录下创建log4j.properties 。

                            log4j.rootLogger=DEBUG,A1 log4j.logger.com.taotao =
                            
                               DEBUG
log4j.logger.org.mybatis 
                            
                            =
                            
                               DEBUG
log4j.appender.A1
                            
                            =
                            
                              org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout
                            
                            =
                            
                              org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern
                            
                            =%-d{yyyy-MM-
                            
                              dd HH:mm:ss,SSS} [
                            
                            %t] [%c]-[%p] %m%n
                          

4、创建MQ连接工具类 。

                            
                              import
                            
                            
                               com.rabbitmq.client.Connection;

                            
                            
                              import
                            
                            
                               com.rabbitmq.client.ConnectionFactory;

                            
                            
                              import
                            
                            
                               java.io.IOException;

                            
                            
                              import
                            
                            
                               java.util.concurrent.TimeoutException;

                            
                            
                              public
                            
                            
                              class
                            
                            
                               ConnectionUtil {
    
                            
                            
                              public
                            
                            
                              static
                            
                             Connection getConnection() 
                            
                              throws
                            
                            
                               IOException,
    TimeoutException {
        
                            
                            
                              //
                            
                            
                              1.创建连接⼯⼚
                            
                            
        ConnectionFactory factory = 
                            
                              new
                            
                            
                               ConnectionFactory();
        
                            
                            
                              //
                            
                            
                              2.在⼯⼚对象中设置MQ的连接信息
                            
                            
                                      (ip,port,virtualhost,username,password)
         factory.setHost(
                            
                            "47.96.11.185"
                            
                              );
        factory.setPort(
                            
                            5672
                            
                              );
        factory.setVirtualHost(
                            
                            "host1"
                            
                              );
        factory.setUsername(
                            
                            "ytao"
                            
                              );
        factory.setPassword(
                            
                            "admin123"
                            
                              );
        
                            
                            
                              //
                            
                            
                              3.通过⼯⼚对象获取与MQ的链接
                            
                            
        Connection connection =
                            
                               factory.newConnection();
        
                            
                            
                              return
                            
                            
                               connection;
    }
}
                            
                          

5、消息⽣产者发送消息 。

                            
                              import
                            
                            
                               com.qfedu.mq.utils.ConnectionUtil;

                            
                            
                              import
                            
                            
                               com.rabbitmq.client.Channel;

                            
                            
                              import
                            
                            
                               com.rabbitmq.client.Connection;

                            
                            
                              public
                            
                            
                              class
                            
                            
                               SendMsg {
    
                            
                            
                              public
                            
                            
                              static
                            
                            
                              void
                            
                             main(String[] args) 
                            
                              throws
                            
                            
                               Exception{
        String msg 
                            
                            = "Hello HuangDaoJun!"
                            
                              ;
        Connection connection 
                            
                            =
                            
                               ConnectionUtil.getConnection();
        Channel channel 
                            
                            =
                            
                               connection.createChannel();
        
                            
                            
                              //
                            
                            
                              定义队列(使⽤Java代码在MQ中新建⼀个队列)
        
                            
                            
                              //
                            
                            
                              参数1:定义的队列名称
        
                            
                            
                              //
                            
                            
                              参数2:队列中的数据是否持久化(如果选择了持久化)
        
                            
                            
                              //
                            
                            
                              参数3: 是否排外(当前队列是否为当前连接私有)
        
                            
                            
                              //
                            
                            
                              参数4:⾃动删除(当此队列的连接数为0时,此队列会销毁(⽆论队列中是否
                            
                            
                              还有数据))
        
                            
                            
                              //
                            
                            
                              参数5:设置当前队列的参数
        
                            
                            
                              //
                            
                            
                              channel.queueDeclare("queue7",false,false,false,null);
        
                            
                            
                              //
                            
                            
                              参数1:交换机名称,如果直接发送信息到队列,则交换机名称为""
        
                            
                            
                              //
                            
                            
                              参数2:⽬标队列名称
        
                            
                            
                              //
                            
                            
                              参数3:设置当前这条消息的属性(设置过期时间 10)
        
                            
                            
                              //
                            
                            
                              参数4:消息的内容
                            
                            
        channel.basicPublish("","queue1",
                            
                              null
                            
                            
                              ,msg.getBytes());
        System.out.println(
                            
                            "发送:" +
                            
                               msg);
        channel.close();
        connection.close();
    }
}
                            
                          

1.2、消息消费者

1、创建Maven项⽬
2、添加依赖
3、log4j.properties
4、ConnetionUtil.java
5、消费者消费消息
                                
                                  import
                                
                                
                                   com.qfedu.mq.utils.ConnectionUtil;

                                
                                
                                  import
                                
                                 com.rabbitmq.client.*
                                
                                  ;

                                
                                
                                  import
                                
                                
                                   java.io.IOException;

                                
                                
                                  import
                                
                                
                                   java.util.concurrent.TimeoutException;

                                
                                
                                  public
                                
                                
                                  class
                                
                                
                                   ReceiveMsg {
    
                                
                                
                                  public
                                
                                
                                  static
                                
                                
                                  void
                                
                                 main(String[] args) 
                                
                                  throws
                                
                                
                                   IOException, TimeoutException {
        Connection connection 
                                
                                =
                                
                                   ConnectionUtil.getConnection();
        Channel channel 
                                
                                =
                                
                                   connection.createChannel();
        Consumer consumer 
                                
                                = 
                                
                                  new
                                
                                
                                   DefaultConsumer(channel){
            @Override
            
                                
                                
                                  public
                                
                                
                                  void
                                
                                
                                   handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 
                                
                                
                                  byte
                                
                                
                                  [] body) 
                                
                                
                                  throws
                                
                                
                                   IOException {
                
                                
                                
                                  //
                                
                                
                                  body就是从队列中获取的数据
                                
                                
                String msg = 
                                
                                  new
                                
                                
                                   String(body);
                System.out.println(
                                
                                "接收:"+
                                
                                  msg);
            }
        };
        channel.basicConsume(
                                
                                "queue1",
                                
                                  true
                                
                                
                                  ,consumer);
    }
}
                                
                              

2、⼯作模式

⼀个发送者多个消费者

2.1、发送者

                          
                            public
                          
                          
                            class
                          
                          
                             SendMsg {
    
                          
                          
                            public
                          
                          
                            static
                          
                          
                            void
                          
                           main(String[] args) 
                          
                            throws
                          
                          
                             Exception{
        System.out.println(
                          
                          "请输⼊消息:"
                          
                            );
        Scanner scanner 
                          
                          = 
                          
                            new
                          
                          
                             Scanner(System.in);
        String msg 
                          
                          = 
                          
                            null
                          
                          
                            ;
        
                          
                          
                            while
                          
                          (!"quit".equals(msg =
                          
                             scanner.nextLine())){
            Connection connection 
                          
                          =
                          
                             ConnectionUtil.getConnection();
            Channel channel 
                          
                          =
                          
                             connection.createChannel();
            channel.basicPublish(
                          
                          "","queue2",
                          
                            null
                          
                          
                            ,msg.getBytes());
            System.out.println(
                          
                          "发送:" +
                          
                             msg);
            channel.close();
            connection.close();
        }
    }
}
                          
                        

2.2、消费者1

                          
                            public
                          
                          
                            class
                          
                          
                             ReceiveMsg {
    
                          
                          
                            public
                          
                          
                            static
                          
                          
                            void
                          
                           main(String[] args) 
                          
                            throws
                          
                          
                             Exception {
        Connection connection 
                          
                          =
                          
                             ConnectionUtil.getConnection();
        Channel channel 
                          
                          =
                          
                             connection.createChannel();
        Consumer consumer 
                          
                          = 
                          
                            new
                          
                          
                             DefaultConsumer(channel){
            @Override
            
                          
                          
                            public
                          
                          
                            void
                          
                          
                             handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 
                          
                          
                            byte
                          
                          [] body) 
                          
                            throws 
                          
                          
                            IOException {
                
                          
                          
                            //
                          
                          
                            body就是从队列中获取的数据
                          
                          
                String msg = 
                          
                            new
                          
                          
                             String(body);
                System.out.println(
                          
                          "Consumer1接收:"+
                          
                            msg);
                
                          
                          
                            if
                          
                          ("wait"
                          
                            .equals(msg)){
                    
                          
                          
                            try
                          
                          
                             {
                        Thread.sleep(
                          
                          10000
                          
                            );
                    }
                    
                          
                          
                            catch
                          
                          
                             (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        channel.basicConsume(
                          
                          "queue2",
                          
                            true
                          
                          
                            ,consumer);
    }
}
                          
                        

2.3、消费者2

                          
                            public
                          
                          
                            class
                          
                          
                             ReceiveMsg {
    
                          
                          
                            public
                          
                          
                            static
                          
                          
                            void
                          
                           main(String[] args) 
                          
                            throws
                          
                          
                             IOException,
    TimeoutException {
        Connection connection 
                          
                          =
                          
                             ConnectionUtil.getConnection();
        Channel channel 
                          
                          =
                          
                             connection.createChannel();
        Consumer consumer 
                          
                          = 
                          
                            new
                          
                          
                             DefaultConsumer(channel){
            @Override
            
                          
                          
                            public
                          
                          
                            void
                          
                          
                             handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 
                          
                          
                            byte
                          
                          [] body) 
                          
                            throws 
                          
                          
                            IOException {
                
                          
                          
                            //
                          
                          
                            body就是从队列中获取的数据
                          
                          
                String msg = 
                          
                            new
                          
                          
                             String(body);
                System.out.println(
                          
                          "Consumer2接收:"+
                          
                            msg);
            }
        };
        channel.basicConsume(
                          
                          "queue2",
                          
                            true
                          
                          
                            ,consumer);
    }
}
                          
                        

3、订阅模式

1、发送者 发送消息到交换机

                          
                            public
                          
                          
                            class
                          
                          
                             SendMsg {
    
                          
                          
                            public
                          
                          
                            static
                          
                          
                            void
                          
                           main(String[] args) 
                          
                            throws
                          
                          
                             Exception{
        System.out.println(
                          
                          "请输⼊消息:"
                          
                            );
        Scanner scanner 
                          
                          = 
                          
                            new
                          
                          
                             Scanner(System.in);
        String msg 
                          
                          = 
                          
                            null
                          
                          
                            ;
        
                          
                          
                            while
                          
                          (!"quit".equals(msg =
                          
                             scanner.nextLine())){
            Connection connection 
                          
                          =
                          
                             ConnectionUtil.getConnection();
            Channel channel 
                          
                          =
                          
                             connection.createChannel();
            channel.basicPublish(
                          
                          "ex1","",
                          
                            null
                          
                          
                            ,msg.getBytes());
            System.out.println(
                          
                          "发送:" +
                          
                             msg);
            channel.close();
            connection.close();
        }
    }
}
                          
                        

2、消费者1

                          
                            public
                          
                          
                            class
                          
                          
                             ReceiveMsg1 {
    
                          
                          
                            public
                          
                          
                            static
                          
                          
                            void
                          
                           main(String[] args) 
                          
                            throws
                          
                          
                             Exception {
        Connection connection 
                          
                          =
                          
                             ConnectionUtil.getConnection();
        Channel channel 
                          
                          =
                          
                             connection.createChannel();
        Consumer consumer 
                          
                          = 
                          
                            new
                          
                          
                             DefaultConsumer(channel){
            @Override
            
                          
                          
                            public
                          
                          
                            void
                          
                          
                             handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 
                          
                          
                            byte
                          
                          [] body) 
                          
                            throws 
                          
                          
                            IOException {
                
                          
                          
                            //
                          
                          
                            body就是从队列中获取的数据
                          
                          
                String msg = 
                          
                            new
                          
                          
                             String(body);
                System.out.println(
                          
                          "Consumer1接收:"+
                          
                            msg);
                
                          
                          
                            if
                          
                          ("wait"
                          
                            .equals(msg)){
                    
                          
                          
                            try
                          
                          
                             {
                        Thread.sleep(
                          
                          10000
                          
                            );
                    }
                    
                          
                          
                            catch
                          
                          
                             (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        channel.basicConsume(
                          
                          "queue3",
                          
                            true
                          
                          
                            ,consumer);
    }
}
                          
                        

3、消费者2

                          
                            public
                          
                          
                            class
                          
                          
                             ReceiveMsg2 {
    
                          
                          
                            public
                          
                          
                            static
                          
                          
                            void
                          
                           main(String[] args) 
                          
                            throws
                          
                          
                             IOException,
    TimeoutException {
        Connection connection 
                          
                          =
                          
                             ConnectionUtil.getConnection();
        Channel channel 
                          
                          =
                          
                             connection.createChannel();
        Consumer consumer 
                          
                          = 
                          
                            new
                          
                          
                             DefaultConsumer(channel){
            @Override
            
                          
                          
                            public
                          
                          
                            void
                          
                          
                             handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 
                          
                          
                            byte
                          
                          [] body) 
                          
                            throws 
                          
                          
                            IOException {
                
                          
                          
                            //
                          
                          
                            body就是从队列中获取的数据
                          
                          
                String msg = 
                          
                            new
                          
                          
                             String(body);
                System.out.println(
                          
                          "Consumer2接收:"+
                          
                            msg);
            }
        }
        ;
        channel.basicConsume(
                          
                          "queue4",
                          
                            true
                          
                          
                            ,consumer);
    }
}
                          
                        

4、路由模式

1、发送者 发送消息到交换机

                          
                            public
                          
                          
                            class
                          
                          
                             SendMsg {
    
                          
                          
                            public
                          
                          
                            static
                          
                          
                            void
                          
                           main(String[] args) 
                          
                            throws
                          
                          
                             Exception{
        System.out.println(
                          
                          "请输⼊消息:"
                          
                            );
        Scanner scanner 
                          
                          = 
                          
                            new
                          
                          
                             Scanner(System.in);
        String msg 
                          
                          = 
                          
                            null
                          
                          
                            ;
        
                          
                          
                            while
                          
                          (!"quit".equals(msg =
                          
                             scanner.nextLine())){
            Connection connection 
                          
                          =
                          
                             ConnectionUtil.getConnection();
            Channel channel 
                          
                          =
                          
                             connection.createChannel();
            
                          
                          
                            if
                          
                          (msg.startsWith("a"
                          
                            )){
                channel.basicPublish(
                          
                          "ex2","a",
                          
                            null
                          
                          
                            ,msg.getBytes());
            } 
                          
                          
                            else
                          
                          
                            if
                          
                          (msg.startsWith("b"
                          
                            )){
                channel.basicPublish(
                          
                          "ex2","b",
                          
                            null
                          
                          
                            ,msg.getBytes());
            }
            System.out.println(
                          
                          "发送:" +
                          
                             msg);
            channel.close();
            connection.close();
        }
    }
}
                          
                        

2、消费者1

                          
                            public
                          
                          
                            class
                          
                          
                             ReceiveMsg1 {
    
                          
                          
                            public
                          
                          
                            static
                          
                          
                            void
                          
                           main(String[] args) 
                          
                            throws
                          
                          
                             Exception {
        Connection connection 
                          
                          =
                          
                             ConnectionUtil.getConnection();
        Channel channel 
                          
                          =
                          
                             connection.createChannel();
        Consumer consumer 
                          
                          = 
                          
                            new
                          
                          
                             DefaultConsumer(channel){
            @Override
            
                          
                          
                            public
                          
                          
                            void
                          
                          
                             handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 
                          
                          
                            byte
                          
                          [] body) 
                          
                            throws 
                          
                          
                            IOException {
                
                          
                          
                            //
                          
                          
                            body就是从队列中获取的数据
                          
                          
                String msg = 
                          
                            new
                          
                          
                             String(body);
                System.out.println(
                          
                          "Consumer1接收:"+
                          
                            msg);
                
                          
                          
                            if
                          
                          ("wait"
                          
                            .equals(msg)){
                    
                          
                          
                            try
                          
                          
                             {
                        Thread.sleep(
                          
                          10000
                          
                            );
                    }
                    
                          
                          
                            catch
                          
                          
                             (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        channel.basicConsume(
                          
                          "queue5",
                          
                            true
                          
                          
                            ,consumer);
    }
}
                          
                        

3、消费者2

                          
                            public
                          
                          
                            class
                          
                          
                             ReceiveMsg2 {
    
                          
                          
                            public
                          
                          
                            static
                          
                          
                            void
                          
                           main(String[] args) 
                          
                            throws
                          
                          
                             IOException, TimeoutException {
        Connection connection 
                          
                          =
                          
                             ConnectionUtil.getConnection();
        Channel channel 
                          
                          =
                          
                             connection.createChannel();
        Consumer consumer 
                          
                          = 
                          
                            new
                          
                          
                             DefaultConsumer(channel){
            @Override
            
                          
                          
                            public
                          
                          
                            void
                          
                          
                             handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 
                          
                          
                            byte
                          
                          [] body) 
                          
                            throws 
                          
                          
                            IOException {
                
                          
                          
                            //
                          
                          
                            body就是从队列中获取的数据
                          
                          
                String msg = 
                          
                            new
                          
                          
                             String(body);
                System.out.println(
                          
                          "Consumer2接收:"+
                          
                            msg);
            }
        };
        channel.basicConsume(
                          
                          "queue6",
                          
                            true
                          
                          
                            ,consumer);
    }
}
                          
                        

7、在SpringBoot应⽤中使⽤MQ

SpringBoot应⽤可以完成⾃动配置及依赖注⼊——可以通过Spring直接提供与MQ的连接对象

1、消息⽣产者

1、创建SpringBoot应⽤,添加依赖 。

2、配置application.yml 。

                            
                              server:
  port: 
                            
                            9001
                            
                              
spring:
  application:
    name: producer
  rabbitmq:
    host: 
                            
                            47.96.11.185
                            
                              
    port: 
                            
                            5672
                            
                              
    virtual
                            
                            -
                            
                              host: host1
    username: ytao
    password: admin123
                            
                          

3、发送消息 。

                            
                              @Service

                            
                            
                              public
                            
                            
                              class
                            
                            
                               TestService {
    @Resource
    
                            
                            
                              private
                            
                            
                               AmqpTemplate amqpTemplate;
    
                            
                            
                              public
                            
                            
                              void
                            
                            
                               sendMsg(String msg){
        
                            
                            
                              //
                            
                            
                              1. 发送消息到队列
                            
                            
        amqpTemplate.convertAndSend("queue1"
                            
                              ,msg);
        
                            
                            
                              //
                            
                            
                              2. 发送消息到交换机(订阅交换机)
                            
                            
        amqpTemplate.convertAndSend("ex1",""
                            
                              ,msg);
        
                            
                            
                              //
                            
                            
                              3. 发送消息到交换机(路由交换机)
                            
                            
        amqpTemplate.convertAndSend("ex2","a"
                            
                              ,msg);
    }
}
                            
                          

2、消息消费者

1、创建项⽬添加依赖
2、配置yml
3、接收消息
                              
                                @Service

                              
                              
                                //
                              
                              
                                @RabbitListener(queues = {"queue1","queue2"})
                              
                              
@RabbitListener(queues = "queue1"
                              
                                )

                              
                              
                                public
                              
                              
                                class
                              
                              
                                 ReceiveMsgService {
    @RabbitHandler
    
                              
                              
                                public
                              
                              
                                void
                              
                              
                                 receiveMsg(String msg){
        System.out.println(
                              
                              "接收MSG:"+
                              
                                msg);
    }
}
                              
                            

⼋、使⽤RabbitMQ传递对象

RabbitMQ是消息队列,发送和接收的都是字符串/字节数组类型的消息

1、使⽤序列化对象

要求:
传递的对象实现序列化接⼝
传递的对象的包名、类名、属性名必须⼀致

  。

1、消息提供者 。

                            
                              @Service

                            
                            
                              public
                            
                            
                              class
                            
                            
                               MQService {
    @Resource
    
                            
                            
                              private
                            
                            
                                AmqpTemplate   amqpTemplate  
                              ;
    
                            
                            
                              public
                            
                            
                              void
                            
                            
                               sendGoodsToMq(Goods goods){
        
                            
                            
                              //
                            
                            
                              消息队列可以发送 字符串、字节数组、序列化对象
                            
                            
        amqpTemplate.convertAndSend("","queue1"
                            
                              ,goods);
    }
}
                            
                          

2、消息消费者 。

                            
                              @Component

                                @RabbitListener(queues  
                            
                            
                               = "queue1") 
                            
                            
                              public
                            
                            
                              class
                            
                            
                               ReceiveService {
    @RabbitHandler
    
                            
                            
                              public
                            
                            
                              void
                            
                            
                               receiveMsg(Goods goods){
        System.out.println(
                            
                            "Goods---"+
                            
                              goods);
    }
}
                            
                          

2、使⽤序列化字节数组

要求:
  传递的对象实现序列化接⼝
  传递的对象的包名、类名、属性名必须⼀致

  。

1、消息提供者 。

                            
                              @Service

                            
                            
                              public
                            
                            
                              class
                            
                            
                               MQService {
    @Resource
    
                            
                            
                              private
                            
                            
                               AmqpTemplate amqpTemplate;
    
                            
                            
                              public
                            
                            
                              void
                            
                            
                               sendGoodsToMq(Goods goods){
        
                            
                            
                              //
                            
                            
                              消息队列可以发送 字符串、字节数组、序列化对象
                            
                            
                               byte[] bytes = 
                            
                            
                                SerializationUtils.serialize(goods);  
                              
        amqpTemplate.convertAndSend(
                            
                            "","queue1"
                            
                              ,bytes);
    }
}
                            
                          

2、消息消费者 。

                            
                              @Component
@RabbitListener(queues 
                            
                            = "queue1"
                            
                              )

                            
                            
                              public
                            
                            
                              class
                            
                            
                               ReceiveService {
    @RabbitHandler
    
                            
                            
                              public
                            
                            
                              void
                            
                             receiveMsg(
                            
                              byte
                            
                            
                              [] bs){
        Goods goods 
                            
                            =
                            
                                (Goods) SerializationUtils.deserialize(bs);  
                              
        System.out.println(
                            
                            "byte[]---"+
                            
                              goods);
    }
}
                            
                          

3、使⽤JSON字符串传递

要求:对象的属性名⼀直

  。

1、消息提供者 。

                            
                              @Service

                            
                            
                              public
                            
                            
                              class
                            
                            
                               MQService {
    @Resource
    
                            
                            
                              private
                            
                            
                               AmqpTemplate amqpTemplate;
    
                            
                            
                              public
                            
                            
                              void
                            
                             sendGoodsToMq(Goods goods) 
                            
                              throws 
                            
                            
                              JsonProcessingException {
        
                            
                            
                              //
                            
                            
                              消息队列可以发送 字符串、字节数组、序列化对象
                            
                            
                               ObjectMapper objectMapper = new ObjectMapper(); String msg = 
                            
                            
                                objectMapper.writeValueAsString(goods);  
                              
        amqpTemplate.convertAndSend(
                            
                            "","queue1"
                            
                              ,msg);
    }
}
                            
                          

2、消息消费者 。

                            
                              @Component
@RabbitListener(queues 
                            
                            = "queue1"
                            
                              )

                            
                            
                              public
                            
                            
                              class
                            
                            
                               ReceiveService {
    @RabbitHandler
    
                            
                            
                              public
                            
                            
                              void
                            
                             receiveMsg(String msg) 
                            
                              throws 
                            
                            
                              JsonProcessingException {
        
                                ObjectMapper objectMapper  
                            
                            
                               = new ObjectMapper(); Goods goods = objectMapper.readValue(msg,Goods.class 
                            
                            
                                );  
                              
        System.out.println(
                            
                            "String---"+
                            
                              msg);
    }
}
                            
                          

9、基于Java的交换机与队列创建

我们使⽤消息队列,消息队列和交换机可以通过管理系统完成创建,也可以在应⽤程序中通过Java代码来完成创建

1、普通Maven项⽬交换机及队列创建

1、使⽤Java代码新建队列 。

                            
                              //
                            
                            
                              1.定义队列 (使⽤Java代码在MQ中新建⼀个队列)

                            
                            
                              //
                            
                            
                              参数1:定义的队列名称

                            
                            
                              //
                            
                            
                              参数2:队列中的数据是否持久化(如果选择了持久化)

                            
                            
                              //
                            
                            
                              参数3: 是否排外(当前队列是否为当前连接私有)

                            
                            
                              //
                            
                            
                              参数4:⾃动删除(当此队列的连接数为0时,此队列会销毁(⽆论队列中是否还有数
                            
                            
                              据))

                            
                            
                              //
                            
                            
                              参数5:设置当前队列的参数
                            
                            
channel.queueDeclare("queue7",
                            
                              false
                            
                            ,
                            
                              false
                            
                            ,
                            
                              false
                            
                            ,
                            
                              null
                            
                            );
                          

2、新建交换机 。

                            
                              //
                            
                            
                              定义⼀个“订阅交换机”
                            
                            
channel.exchangeDeclare("ex3"
                            
                              , BuiltinExchangeType.FANOUT);

                            
                            
                              //
                            
                            
                              定义⼀个“路由交换机”
                            
                            
channel.exchangeDeclare("ex4", BuiltinExchangeType.DIRECT);
                          

3、绑定队列到交换机 。

                            
                              //
                            
                            
                              绑定队列

                            
                            
                              //
                            
                            
                              参数1:队列名称

                            
                            
                              //
                            
                            
                              参数2:⽬标交换机

                            
                            
                              //
                            
                            
                              参数3:如果绑定订阅交换机参数为"",如果绑定路由交换机则表示设置队列的key
                            
                            
channel.queueBind("queue7","ex4","k1"
                            
                              );
channel.queueBind(
                            
                            "queue8","ex4","k2");
                          

2、SpringBoot应⽤中通过配置完成队列的创建

                          
                            @Configuration

                          
                          
                            public
                          
                          
                            class
                          
                          
                             RabbitMQConfiguration {
    
                          
                          
                            //
                          
                          
                            声明队列
                          
                          
                                @Bean
    
                          
                          
                            public
                          
                          
                             Queue queue9(){
        Queue queue9 
                          
                          = 
                          
                            new
                          
                           Queue("queue9"
                          
                            );
        
                          
                          
                            //
                          
                          
                            设置队列属性
                          
                          
                            return
                          
                          
                             queue9;
    }
    @Bean
    
                          
                          
                            public
                          
                          
                             Queue queue10(){
        Queue queue10 
                          
                          = 
                          
                            new
                          
                           Queue("queue10"
                          
                            );
        
                          
                          
                            //
                          
                          
                            设置队列属性
                          
                          
                            return
                          
                          
                             queue10;
    }
    
                          
                          
                            //
                          
                          
                            声明订阅模式交换机
                          
                          
                                @Bean
    
                          
                          
                            public
                          
                          
                             FanoutExchange ex5(){
        
                          
                          
                            return
                          
                          
                            new
                          
                           FanoutExchange("ex5"
                          
                            );
    }
    
                          
                          
                            //
                          
                          
                            声明路由模式交换机
                          
                          
                                @Bean
    
                          
                          
                            public
                          
                          
                             DirectExchange ex6(){
        
                          
                          
                            return
                          
                          
                            new
                          
                           DirectExchange("ex6"
                          
                            );
    }
    
                          
                          
                            //
                          
                          
                            绑定队列
                          
                          
                                @Bean
    
                          
                          
                            public
                          
                          
                             Binding bindingQueue9(Queue queue9, DirectExchange ex6){
        
                          
                          
                            return
                          
                           BindingBuilder.bind(queue9).to(ex6).with("k1"
                          
                            );
    }
    @Bean
    
                          
                          
                            public
                          
                          
                             Binding bindingQueue10(Queue queue10, DirectExchange ex6){
        
                          
                          
                            return
                          
                           BindingBuilder.bind(queue10).to(ex6).with("k2"
                          
                            );
    }
}
                          
                        

⼗、消息的可靠性

消息的可靠性:从 ⽣产者发送消息 —— 消息队列存储消息 —— 消费者消费消息 的整个过程中消息的安全性及可控性。
  • ⽣产者
  • 消息队列
  • 消费者

1、RabbitMQ事务

RabbitMQ事务指的是基于客户端实现的事务管理,当在消息发送过程中添加了事务,处理效率降低⼏⼗倍甚⾄上百倍 
                          Connection connection = RabbitMQUtil.getConnection(); 
                          
                            //
                          
                          
                            connection 表示与 host1的连接
                          
                          
Channel channel =
                          
                             connection.createChannel();
channel.txSelect();
                          
                          
                            //
                          
                          
                            开启事务
                          
                          
                            try
                          
                          
                            {
    channel.basicPublish(
                          
                          "ex4", "k1", 
                          
                            null
                          
                          
                            , msg.getBytes());
    channel.txCommit();
                          
                          
                            //
                          
                          
                            提交事务
                          
                          
                            }

                          
                          
                            catch
                          
                          
                             (Exception e){
    channel.txRollback();
                          
                          
                            //
                          
                          
                            事务回滚
                          
                          
                            }

                          
                          
                            finally
                          
                          
                            {
    channel.close();
    connection.close();
}
                          
                        

2、RabbitMQ消息确认和return机制

1、消息确认机制:确认消息提供者是否成功发送消息到交换机
2、return机制:确认消息是否成功的从交换机分发到队列

2.1、普通Maven项⽬的消息确认

1、普通confirm⽅式 。

                            
                              //
                            
                            
                              1.发送消息之前开启消息确认
                            
                            
                                channel.confirmSelect();  
                              
channel.basicPublish(
                            
                            "ex1", "a", 
                            
                              null
                            
                            
                              , msg.getBytes());

                            
                            
                              //
                            
                            
                              2.接收消息确认
                            
                            
                               Boolean b = 
                            
                            
                                channel.waitForConfirms();  
                              
System.out.println(
                            
                            "发送:" +(b?"成功":"失败"));
                          

2、批量confirm⽅式 。

                            
                              //
                            
                            
                              1.发送消息之前开启消息确认
                            
                            
                               channel.confirmSelect(); 
                            
                            
                              //
                            
                            
                              2.批量发送消息
                            
                            
                              for
                            
                             (
                            
                              int
                            
                             i=0 ; i<10 ; i++
                            
                              ){
    channel.basicPublish(
                            
                            "ex1", "a", 
                            
                              null
                            
                            
                              , msg.getBytes());
}

                            
                            
                              //
                            
                            
                              3.接收批量消息确认:发送的所有消息中,如果有⼀条是失败的,则所有消息发送直接失败,抛出IO异常
                            
                            
                               Boolean b = channel.waitForConfirms(); 
                            
                          

3、异步confirm⽅式 。

                            
                              //
                            
                            
                              发送消息之前开启消息确认
                            
                            
                              channel.confirmSelect();

                            
                            
                              //
                            
                            
                              批量发送消息
                            
                            
                              for
                            
                             (
                            
                              int
                            
                             i=0 ; i<10 ; i++
                            
                              ){
    channel.basicPublish(
                            
                            "ex1", "a", 
                            
                              null
                            
                            
                              , msg.getBytes());
}

                            
                            
                              //
                            
                            
                              假如发送消息需要10s,waitForConfirms会进⼊阻塞状态

                            
                            
                              //
                            
                            
                              boolean b = channel.waitForConfirms();

                            
                            
                              //
                            
                            
                              使⽤监听器异步confirm
                            
                            
                               channel.addConfirmListener 
                            
                            (
                            
                               new 
                            
                            
                                ConfirmListener()  
                               {
    
                            
                            
                              //
                            
                            
                              参数1: long l 返回消息的表示
    
                            
                            
                              //
                            
                            
                              参数2: boolean b 是否为批量confirm
                            
                            
                              public
                            
                            
                              void
                            
                             handleAck(
                            
                              long
                            
                             l, Boolean b) 
                            
                              throws
                            
                            
                               IOException {
        System.out.println(
                            
                            "~~~~~消息成功发送到交换机"
                            
                              );
    }
    
                            
                            
                              public
                            
                            
                              void
                            
                             handleNack(
                            
                              long
                            
                             l, Boolean b) 
                            
                              throws
                            
                            
                               IOException {
        System.out.println(
                            
                            "~~~~~消息发送到交换机失败"
                            
                              );
    }
}
);
                            
                          

2.2、普通Maven项⽬的return机制

1、添加return监听器
2、发送消息是指定第三个参数为true
3、由于监听器监听是异步处理,所以在消息发送之后不能关闭channel
                              String msg = "Hello HuangDaoJun!"
                              
                                ;
Connection connection 
                              
                              =
                              
                                 ConnectionUtil.getConnection();

                              
                              
                                //
                              
                              
                                相当于JDBC操作的数据库连接
                              
                              
Channel channel =
                              
                                 connection.createChannel();

                              
                              
                                //
                              
                              
                                相当于JDBC操作的statement

                              
                              
                                //
                              
                              
                                return机制:监控交换机是否将消息分发到队列
                              
                              
channel.addReturnListener(
                              
                                new
                              
                              
                                 ReturnListener() {
    
                              
                              
                                public
                              
                              
                                void
                              
                               handleReturn(
                              
                                int
                              
                              
                                 i, String s, String s1, String s2,AMQP.BasicProperties basicProperties,
                              
                              
                                byte
                              
                              [] bytes) 
                              
                                throws 
                              
                              
                                IOException {
        
                              
                              
                                //
                              
                              
                                如果交换机分发消息到队列失败,则会执⾏此⽅法(⽤来处理交换机分发消息到队列失败的情况)
                              
                              
        System.out.println("*****"+
                              
                                i);
                              
                              
                                //
                              
                              
                                标识
                              
                              
        System.out.println("*****"+
                              
                                s);
                              
                              
                                //

                              
                                      System.out.println("*****"+
                              
                                s1);
                              
                              
                                //
                              
                              
                                交换机名
                              
                              
        System.out.println("*****"+
                              
                                s2);
                              
                              
                                //
                              
                              
                                交换机对应的队列的key
                              
                              
        System.out.println("*****"+
                              
                                new
                              
                              
                                 String(bytes));
                              
                              
                                //
                              
                              
                                发送的消息
                              
                              
                                    }
}
);

                              
                              
                                //
                              
                              
                                发送消息

                              
                              
                                //
                              
                              
                                channel.basicPublish("ex2", "c", null, msg.getBytes());
                              
                              
channel.basicPublish("ex2", "c", 
                              
                                true
                              
                              , 
                              
                                null
                              
                              , msg.getBytes());
                            

2.3、在SpringBoot应⽤实现消息确认与return监听

1、配置application.yml,开启消息确认和return监听 。

                            
                              spring:
 rabbitmq:
    publisher
                            
                            -confirm-
                            
                              type: simple ## 开启消息确认模式
    publisher
                            
                            -returns: 
                            
                              true
                            
                             ##使⽤return监听机制
                          

2、创建confirm和return监听 。

2.1、消息确认 。

                            
                              @Component

                            
                            
                              public
                            
                            
                              class
                            
                             MyConfirmListener 
                            
                              implements
                            
                            
                              
RabbitTemplate.ConfirmCallback {
    @Autowired
    
                            
                            
                              private
                            
                            
                               AmqpTemplate amqpTemplate;
    @Autowired
    
                            
                            
                              private
                            
                            
                               RabbitTemplate rabbitTemplate;
    @PostConstruct
    
                            
                            
                              public
                            
                            
                              void
                            
                            
                               init(){
       
                                rabbitTemplate.setConfirmCallback(  
                            
                            
                               this 
                            
                            
                                );  
                              
    }
    @Override
    
                            
                            
                              public
                            
                            
                              void
                            
                            
                                confirm  
                              (CorrelationData correlationData, Boolean b, String s) {
        
                            
                            
                              //
                            
                            
                              参数b 表示消息确认结果
        
                            
                            
                              //
                            
                            
                              参数s 表示发送的消息
                            
                            
                              if
                            
                            
                              (b){
            System.out.println(
                            
                            "消息发送到交换机成功!"
                            
                              );
        } 
                            
                            
                              else
                            
                            
                              {
            System.out.println(
                            
                            "消息发送到交换机失败!"
                            
                              );
            amqpTemplate.convertAndSend(
                            
                            "ex4",""
                            
                              ,s);
        }
    }
}
                            
                          

2.2、return机制 。

                            
                              @Component

                            
                            
                              public
                            
                            
                              class
                            
                             MyReturnListener 
                            
                              implements
                            
                            
                               RabbitTemplate.ReturnsCallback
{
    @Autowired
    
                            
                            
                              private
                            
                            
                               AmqpTemplate amqpTemplate;
    @Autowired
    
                            
                            
                              private
                            
                            
                               RabbitTemplate rabbitTemplate;
    @PostConstruct
    
                            
                            
                              public
                            
                            
                              void
                            
                            
                               init(){
        
                                rabbitTemplate.setReturnsCallback(  
                            
                            
                               this 
                            
                            
                                );  
                              
    }
    @Override
    
                            
                            
                              public
                            
                            
                              void
                            
                            
                                returnedMessage  
                              (ReturnedMessage returnedMessage) {
        System.out.println(
                            
                            "消息从交换机分发到队列失败"
                            
                              );
        String exchange 
                            
                            =
                            
                               returnedMessage.getExchange();
        String routingKey 
                            
                            =
                            
                               returnedMessage.getRoutingKey();
        String msg 
                            
                            =
                            
                               returnedMessage.getMessage().toString();
        amqpTemplate.convertAndSend(exchange,routingKey,msg);
    }
}
                            
                          

3、RabbitMQ消费者⼿动应答 

                          
                            @Component

                              @    Ra  bbitListener  
                            (queues
                          
                          ="queue01"
                          
                            )

                          
                          
                            public
                          
                          
                            class
                          
                          
                             Consumer1 {
    @RabbitHandler
    
                          
                          
                            public
                          
                          
                            void
                          
                          
                             process(String msg,Channel channel, Message message) 
                          
                          
                            throws
                          
                          
                             IOException {
        
                          
                          
                            try
                          
                          
                             {
            System.out.println(
                          
                          "get msg1 success msg = "+
                          
                            msg);
            
                          
                          
                            /**
                          
                          
                            
         * 确认⼀条消息:<br>
         * channel.basicAck(deliveryTag, false); <br>
         * deliveryTag:该消息的index <br>
         * multiple:是否批量.true:将⼀次性ack所有⼩于deliveryTag的消息 <br>
       
                          
                          
                            */
                          
                          
                            
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), 
                          
                          
                            false
                          
                          
                            );
        } 
                          
                          
                            catch
                          
                          
                             (Exception e) {
            
                          
                          
                            //
                          
                          
                            消费者处理出了问题,需要告诉队列信息消费失败
                          
                          
                            /**
                          
                          
                            
         * 拒绝确认消息:<br>
         * channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ; <br>
         * deliveryTag:该消息的index<br>
         * multiple:是否批量.true:将⼀次性拒绝所有⼩于deliveryTag的消息。<br>
         * requeue:被拒绝的是否重新⼊队列 <br>
       
                          
                          
                            */
                          
                          
                            
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), 
                          
                          
                            false
                          
                          , 
                          
                            true
                          
                          
                            );
            System.err.println(
                          
                          "get msg1 failed msg = "+
                          
                            msg);
        }
    }
}
                          
                        

4、消息消费的幂等性问题

消息消费的幂等性——多次消费的执⾏结果时相同的 (避免重复消费)
解决⽅案:处理成功的消息setnx到redis

⼗⼀、延迟机制

1、延迟队列

1、延迟队列——消息进⼊到队列之后,延迟指定的时间才能被消费者消费
2、AMQP协议和RabbitMQ队列本身是不⽀持延迟队列功能的,但是可以通过TTL(Time To Live)特性模拟延迟队列的功能
3、TTL就是消息的存活时间。RabbitMQ可以分别对队列和消息设置存活时间

1、在创建队列的时候可以设置队列的存活时间,当消息进⼊到队列并且在存活时间内没有消费者消费,则此消息就会从当前队列被移除;
2、创建消息队列没有设置TTL,但是消息设置了TTL,那么当消息的存活时间结束,也会被移除;
3、当TTL结束之后,我们可以指定将当前队列的消息转存到其他指定的队列

2、使⽤延迟队列实现订单⽀付监控

1、实现流程图 。

 2、创建交换机和队列 。

⼗⼆、消息队列作⽤/使⽤场景总结

1、解耦 。

场景说明:⽤户下单之后,订单系统要通知库存系统

2、异步 。

场景说明:⽤户注册成功之后,需要发送注册邮件及注册短信提醒

3、消息通信 。

场景说明:应⽤系统之间的通信,例如聊天室

4、流量削峰 。

场景说明:秒杀业务

5、⽇志处理 。

场景说明:系统中⼤量的⽇志处理

  。

最后此篇关于RabbitMQ保姆级教程最佳实践的文章就讲到这里了,如果你想了解更多关于RabbitMQ保姆级教程最佳实践的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

37 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com