gpt4 book ai didi

分布式多协议接入网关FluxMQ-2.0功能说明

转载 作者:我是一只小鸟 更新时间:2023-07-12 06:33:23 25 4
gpt4 key购买 nike

FluxMQ—2.0版本更新内容

前言

FLuxMQ是一款基于java开发,支持无限设备连接的云原生分布式物联网接入平台。FluxMQ基于Netty开发,底层采用Reactor3反应堆模型,具备低延迟,高吞吐量,千万、亿级别设备连接;方便企业快速构建其物联网平台与应用.

FluxMQ官网:https://www.fluxmq.com FluxMQ演示系统:http://demo.fluxmq.com/ 。

改动说明

功能 说明
自研EventBus通信组件 提供大批量数据路由压缩等功能
分布式发布订阅匹配树 提供更快速度的Topic路由
会话消息 支持Web管理会话消息、数据持久化
保留消息 支持Web管理保留消息、数据持久化
延迟消息 支持Web管理延迟消息 、数据持久化
配置持久化 Web配置页面配置持久化
规则引擎 新增LOG数据源,数据写入独立LOG文件
规则引擎 新增JSON函数,用户处理嵌套的JSON数据格式
规则引擎 新增协议扩展数据格式,用户统一转发第三方扩展协议
多协议模块 基于FluxMQ MQTT协议拓展多协议模块、提供同一的连接订阅管理能力

自研EventBus通信组件

研发背景

去除Ignite通信组件,采用自研集群通信,结合分布式订阅树完成性能的大幅度提升 。

Flux1.0版本采用Ignite的message API进行数据路由,此方式主要有以下几个缺点:

  1. 集群间通信采用广播方式,大集群下通信性能极低
  2. 不支持通配符方式,无法解决通配符的路由
  3. 无法实现集群消费能力
  4. 对Ignite依赖非常重,导致出现问题排查代价很高,并且无法替换之

EventBus特性

集群通信

基于TCP组件(后面拓展UDP组播等功能),实现集群间启动相互连接,服务端实现端口占用扫描启动:默认48880端口、如果端口占用则依次递增,最大端口号为:49000。 节点启动后,客户端根据配置文件配置的集群IP自动进行端口扫描连接(48880->49000),同时客户端维护与服务端的心跳。避免节点宕机.

报文帧

表格示例

固定头(1 byte) Topic长度(1 byte) Topic(n byte) Body长度(2 byte) Body(n byte)
消息类型 2bit Qos 2bit 是否压缩 1bit 是否批量 1bit 保留bit 2bit 9 test/test 11 HELLO,WORLD

批量压缩

FluxMQ对于集群消息路由会自动计算TPS,当单节点TPS超过2000时,会自动启动批量压缩功能,以此提高集群间传输性能(对时延要求极高的可以手动关闭批量压缩功能).

分布式发布订阅匹配树

FluxMQ集群节点间维护一个Root级别的订阅,订阅会分为2种

  • 本地订阅
  • 远程订阅 为了最快搜索匹配树,订阅信息会维护到每个节点中,当推送Topic通过匹配树匹配后,本地订阅直接Write消息,远程订阅走EventBus系统传输到远程节点。

数据管理

2.0版本新增延迟消息跟会话消息,1.0版本也有保留消息,但是管理页面未实现可视化管理。下面我们介绍下此次改动的一些 。

会话消息

提供分布式会话消息,会话期间Session消息持久化存储,集群宕机后重启,数据不丢失,重启集群后数据重新加载 。

保留消息

根据Topic保留消息,每个TOPIC仅保留一条,当传输的MQTT payload为空时,则清空保留消息。数据持久化存储,重启集群后,数据重新加载 。

延迟消息

FluxMQ提供大批量定时下发Topic指令的能力,单机支持百万级别延迟消息指令下发,在集群模式下,FluxMQ接收到延迟指令后,会自动负载到执行节点执行,当执行节点宕机后,此节点未执行的任务会自动由其他节点继续执行,提供分布式协调任务的能力 。

延迟Topic格式:

                        
                        
                        
                           $DELAY 
                          /延迟秒指/TOPIC
                          

配置持久化

基于Ignite的实现配置数据区持久化,目前持久化的数据内容有以下:

数据区 是否开启持久化
数据源配置 ✔️
规则配置 ✔️
ACL配置 ✔️
系统配置 ✔️
保留消息 ✔️
会话消息 ✔️
延迟消息 ✔️
规则引擎 ✔️
云客户端 ✔️
协议扩展 ✔️

规则引擎

LOG文件打印

此项功能可以用于调试报文,并且于集群各节点生产独立的log文件,用于快速定位问题 。

数据库SQL模板支持Json函数'

数据输入:

                        
                        
                        
                          {
                          
     "msg" : {
       "id" : "id" ,
       "body" :{
         "state" :1,
         "no" :2
      }
    },
     "messageId" : 1,
     "topic" "test" ,
     "qos" : 1,
     "retain" false ,
     "time" "2022 12-22 12:00:00" ,
     "clientId" "A1212313"
}

此时我只想插入msg内容下的body结构体,以下是一个通用的插入SQL语句模板:

                        
                        
                        
                          insert into table (clientId,topic,msg) values (
                           '${clientId}' 
                          ,
                           '${topic}' 
                          ,
                           '${json(msg.body)}' 
                          )
                          

通过json(变量名) 方式给结构体转成json字符串替换成插入字段的值 。

多协议模块

目前FluxMQ内置了COAP、WEBSOCKET、I1协议的组件,可以指定端口启动,启动后,可以通过MQTT与协议组件之间交互。每个客户端必须按照FluxMQ的标准进行接入。扩展协议与FluxMQ的MQTT共享以下组件:

  • 认证模块
  • 规则引擎
  • 连接管理
  • 日志管理
  • 监控管理

上行指令

通过规则引擎配置选择扩展协议数据类型 。

select * from "$EVENT.EXTENSION" 。

传输的数据格式如下:

                        
                        
                        
                          {
                          
     "protocol" "I1" ,
     "cmd" "PUBLISH" ,
     "messageId" : 0,
     "time" "2023-07-11 21:59:23" ,
     "clientId" "clientId" ,
     "nodeIp" "127.0.0.1" ,
     "clientIp" "127.0.0.1:19999" ,
     "body" "body"
}
字段 说明
protocol 协议名称
cmd 指令类型
- PUBLISH 推送消息
- CONNECT 连接
- CLOSE 断开
messageId 消息id
time 时间
clientIp 客户端地址
nodeIp 所在集群节点IP
body 报文,如果传输是JSON会自动转成JSON格式,否则统一UTF8字符串处理

新增一个转发WEBSOCKET协议的报文

SQL如下:

                        
                        
                        
                          select * from 
                           "  $EVENT  .EXTENSION WHERE protocol='WEBSOCKET'" 
                          

新增一个转发WEBSOCKET 上报协议的报文

SQL如下:

                        
                        
                        
                          select * from 
                           "  $EVENT  .EXTENSION WHERE protocol='WEBSOCKET' AND cmd ='PUBLISH' 

下行指令

通过MQTT客户端下发FluxMQ集群指令,即可将指令写给扩展协议客户端,格式如下:

                        
                        
                        
                           $PROTOCOL 
                          /协议名称/{clientId}
                          

连接管理

启动WEBSOCKET协议插件

WEBSOCKET客户端连接

                        
                        
                        
                          ws://123.249.9.130:7777/
                           test 
                          

连接管理

最后此篇关于分布式多协议接入网关FluxMQ-2.0功能说明的文章就讲到这里了,如果你想了解更多关于分布式多协议接入网关FluxMQ-2.0功能说明的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com