gpt4 book ai didi

RabbitMQ消息中间件示例详解

转载 作者:qq735679552 更新时间:2022-09-28 22:32:09 30 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章RabbitMQ消息中间件示例详解由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

前言 。

rabbitmq 是使用 erlang 语言开发的消息中间件, 其遵循了高级消息队列协议(advanced message queuing protocol, amqp).

与 kafka 等消息队列相比,rabbitmq 最大的优势在于其较高的可靠性

  • 提供确认(ack)和重传机制保证消息完成消费, 消费者异常不会导致消息丢失
  • 提供消息持久化机制, broker 崩溃不会导致消息丢失
  • 集群模式下工作, 保证高可用

因为具有较高可靠性和一致性, rabbitmq 可以胜任订单处理、秒杀等一致性要求较高的业务场景.

rabbitmq 概念与机制 。

rabbitmq 中的概念模型

  • broker: 消息中间件实例, 可能是单个节点也可能是运行在多节点集群上的逻辑实体
  • 消息(message): 消息由消息头和消息体两部分组成。消息头中包括routing-key、priority等标准消息头以及其它自定义消息头,用于定义rabbitmq对消息行为。消息体是字节流,包含消息内容。
  • 连接(connection): 客户端与 broker 之间的 tcp连接
  • 信道(channel): channel 是建立在 tcp 连接上的逻辑(虚拟)连接。多个 channel 复用同一个 tcp 连接, 以避免建立 tcp 连接的巨大开销。 rabbitmq 官方要求每个线程使用独立的 channel, 禁止多个线程共用 channel。
  • 生产者(publisher): 发送消息的客户端线程
  • 消费者(consumer): 处理消息的客户端线程
  • 交换机(exchange): 交换机负责将消息投递到相应的队列
  • 队列(queue): 接收并保存交换机投递的消息,直至被消费者成功消费。逻辑结构遵循先进先出fifo。
  • 绑定(binding): 将队列(queue)注册到交换机(exchange)的路由表
  • 虚拟主机(vhost): 每个broker下可建立多个vhost, 每个 vhost 可建立独立的 exchange、queue、绑定及权限系统。同一个 broker 下的 vhost 共享 connection、channel 和 用户系统,就是说可以使用同一个用户身份使用同一个 channel 访问不同 vhost。

交换机(exchange) 。

生产者发送的消息会首先送到交换机(exchange), 交换机根据自身类型和消息的 routing-key 等信息将消息投递到绑定的消息队列中.

rabbitmq中的四种标准交换机

direct: 如果消息的 routing-key 与队列的 binding-key 完全相同,direct类型的交换机则会将消息投递到该队列中.

  • 多个队列可以使用相同的 binding-key 绑定到同一个 direct 交换机,direct 交换机会把消息投递到所有 binding-key 与消息 routing-key 相同的队列

topic: 允许队列的 binding-key 中包含通配符*和#, topic 交换机会将消息投递到 binding-key 与 routing-key 匹配的队列中.

  • 通配符按照关键字进行匹配,如news.cn.a中的关键字是news、cn和a,即关键字按照.分割
  • #通配符匹配0个或多个关键字, news.#.a可以匹配news.a, news.cn.a和news.asia.cn.a等
  • *通配符匹配一个关键字, news.*.a匹配news.cn.a不匹配news.a、news.asia.cn.a

fanout: fanout 交换机不进行任何匹配, 将消息投递到所有绑定的队列 。

header: header 交换机根据消息头进行投递,现在已较少使用 。

我们可以使用 rabbitmq 的插件机制使用第三方交换机或自行开发交换机。如实现延时投递的.

消息头中的delivery-mode可以设置为 persistent(持久化) 或者 transient(易失)。 exchange 和 queue 在处理持久化的消息时都会先将消息写入磁盘中再进行下一步处理, 即使 rabbitmq 崩溃也不会丢失.

消费者客户端通常使用的channel.basicconsume使用推(push)模式投递消息, 即当有新消息时 broker 通过 channel 主动向客户端发送消息。客户端也可以使用channel.basicget从 broker 拉取消息.

ack机制 。

rabbitmq 提供了确认送达(acknowledge)机制保证消息被正确处理不会丢失.

确认送达的回执有三种

  • ack: 消息已被成功处理
  • nack: 消息处理异常, 需要重新投递
  • reject: 消息非法, 丢弃消息

rabbitmq 的 queue 可以设置 no_ack=true, 则消息被投递后即删除不等待回执.

channel.basicconsume 可以指定auto_ack模式,若auto_ack=true当客户端收到完整消息后即会自动发出ack回执,否则必须显式的发出回执.

java 代码示例 。

首先安装并启动rabbitmq实例, mac用户可以使用 homebrew 进行安装

?
1
brew install rabbitmq

启动服务

?
1
brew services start rabbitmq

或者使用官方docker镜像

?
1
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq: 3 -management

rabbitmq官网提供了ubuntu、rpm以及windows等多种平台安装方式.

rabbitmq默认tcp端口为5672, web控制台默认端口15672.

在maven中添加依赖

?
1
2
3
4
5
<dependency>
  <groupid>com.rabbitmq</groupid>
  <artifactid>amqp-client</artifactid>
  <version> 5.5 . 1 </version>
</dependency>

编写生产者

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package rabbit;
 
import java.io.ioexception;
import java.util.concurrent.timeoutexception;
 
import com.rabbitmq.client.amqp;
import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;
 
/**
  * @author finley
  */
public class rabbitproducer {
 
  public static void main(string[] args) throws ioexception, timeoutexception {
   connectionfactory factory = new connectionfactory();
   factory.setusername( "guest" );
   factory.setpassword( "guest" );
   factory.sethost( "localhost" );
   try (connection conn = factory.newconnection();
     channel channel = conn.createchannel()) {
    string exchangename = "test-exchange" ;
    channel.exchangedeclare(exchangename, "direct" , true );
 
    string routingkey = "hello" ;
 
    byte [] msg = "hello world" .getbytes();
    amqp.basicproperties.builder propsbuilder = new amqp.basicproperties.builder();
    propsbuilder.deliverymode( 2 ); // persistent
    propsbuilder.priority( 0 ); // normal
    propsbuilder.contenttype( "text/plain" );
    channel.basicpublish(exchangename, routingkey, propsbuilder.build(), msg);
   }
  }
}

编写消费者

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package rabbit;
 
import java.io.ioexception;
import java.util.concurrent.timeoutexception;
 
import com.rabbitmq.client.*;
 
/**
  * @author finley
  */
public class rabbitconsumer {
 
  public static void main(string[] args) throws ioexception, timeoutexception {
   connectionfactory factory = new connectionfactory();
   factory.setusername( "guest" );
   factory.setpassword( "guest" );
   factory.sethost( "localhost" );
   try (connection conn = factory.newconnection();
     channel channel = conn.createchannel()) {
    string exchangename = "test-exchange" ;
    channel.exchangedeclare(exchangename, "direct" , true );
 
    string queuename = channel.queuedeclare().getqueue();
    string bindingkey = "hello" ;
    channel.queuebind(queuename, exchangename, bindingkey);
 
    while ( true ) {
     channel.basicconsume(queuename, false , "" , new defaultconsumer(channel) {
      @override
      public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte [] body) throws ioexception {
       string routingkey = envelope.getroutingkey();
       string contenttype = properties.getcontenttype();
       string bodystr = new string(body, "utf-8" );
       system.out.println( "routingkey: " + routingkey + ", contenttype: " + contenttype + ", body: " + bodystr);
       long deliverytag = envelope.getdeliverytag();
       channel.basicack(deliverytag, false );
      }
     });
    }
   }
  }
 
}

rabbitmq 的消息为字节, 可以将 java 对象序列化后作为消息体发送.

总结 。

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我的支持.

原文链接:https://www.cnblogs.com/Finley/p/10126315.html 。

最后此篇关于RabbitMQ消息中间件示例详解的文章就讲到这里了,如果你想了解更多关于RabbitMQ消息中间件示例详解的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com