gpt4 book ai didi

Nodejs使用ZooKeeper做服务发现

转载 作者:我是一只小鸟 更新时间:2023-02-17 22:31:12 28 4
gpt4 key购买 nike

将单体服务拆分为微服务后,为了服务高可用,一般会做集群多实例。但在分布式下,怎么进行高效、便捷的进行服务访问问题,出现了各类服务注册和服务发现框架。这里使用的是Zookeeper。ZooKeeper 官网 https://zookeeper.apache.org 。
我们的业务系统使用的开发语言是JAVA,但是部分页面请求是先到nodejs 做的webportal服务,进行权限校验,校验通过后调用Java提供的API。当前阶段Java端已经微服务化,使用Zookeeper作为注册中心,目前只需要让nodejs端,也接入到Zookeeper,作为服务消费者,就能搭建机器环境。

找轮子

通过查找,发现npm有现成的库  node-zookeeper-client  ,避免重复造轮子,就用它了.

接入思路

由于我们只是作为服务消费者,不需要使用服务注册的api,大部分可以直接在文档中找到API.

编码过程 

npm 安装

  1 npm i node-zookeeper-client   。

连接ZK

                          
                             1
                          
                           const Zookeeper = require('node-zookeeper-client'
                          
                            );

                          
                          
                             2
                          
                           const CONNECTION_STRING = "127.0.0.1:2181"; 
                          
                            //
                          
                          
                             ZK的服务地址
                          
                          
                             3
                          
                           const OPTIONS =
                          
                             {

                          
                          
                             4
                          
                             sessionTimeout: 5000 

                          
                             5
                          
                          
                            }

                          
                          
                             6
                          
                           const zk =
                          
                             Zookeeper.createClient(CONNECTION_STRING, OPTIONS);

                          
                          
                             7
                          
                             zk.on('connected', 
                          
                            function
                          
                          
                            (){

                          
                          
                             8
                          
                             console.log("zk====="
                          
                            , zk);

                          
                          
                             9
                          
                          
                            });

                          
                          
                            10
                          
                          
                            //
                          
                          
                            获取根节点下的子节点数据
                          
                          
                            11
                          
                           zk.getChildren('/', 
                          
                            function
                          
                          
                            (error, children, stat){

                          
                          
                            12
                          
                          
                            if
                          
                          
                            (error){

                          
                          
                            13
                          
                          
                                console.log(error.stack);

                          
                          
                            14
                          
                          
                            return
                          
                          
                            ;

                          
                          
                            15
                          
                          
                              }

                          
                          
                            16
                          
                          
                              console.log(children);

                          
                          
                            17
                          
                          
                            })

                          
                          
                            18
                          
                           zk.connect();
                        

其他API(仅供参考)

                          
                             1
                          
                          
                            //
                          
                          
                             判断节点是否已存在
                          
                          
                             2
                          
                           zk.exists('/phpnode',
                          
                            function
                          
                          
                            (error,stat){

                          
                          
                             3
                          
                          
                            if
                          
                          
                            (stat){

                          
                          
                             4
                          
                                   console.log("节点存在"
                          
                            );

                          
                          
                             5
                          
                              }
                          
                            else
                          
                          
                            {

                          
                          
                             6
                          
                                  console.log("节点不存在"
                          
                            );

                          
                          
                             7
                          
                          
                               }

                          
                          
                             8
                          
                          
                            })

                          
                          
                             9
                          
                          
                            10
                          
                          
                            //
                          
                          
                             创建/注册节点
                          
                          
                            11
                          
                           zk.create('/phpnode',
                          
                            new
                          
                           Buffer('hello'),
                          
                            function
                          
                          
                            (error,path){

                          
                          
                            12
                          
                          
                               console.log(path);

                          
                          
                            13
                          
                          
                            })

                          
                          
                            14
                          
                          
                            15
                          
                          
                            //
                          
                          
                             获取节点数据
                          
                          
                            16
                          
                           zk.getData('/phpnode',
                          
                            function
                          
                          
                            (error,data,stat){

                          
                          
                            17
                          
                          
                               console.log(data.toString());

                          
                          
                            18
                          
                          
                            });

                          
                          
                            19
                          
                          
                            20
                          
                          
                            //
                          
                          
                            节点删除
                          
                          
                            21
                          
                           zk.remove('/phpnode',
                          
                            function
                          
                          
                            (error){

                          
                          
                            22
                          
                          
                            if
                          
                          (!
                          
                            error){

                          
                          
                            23
                          
                                  console.log('node 节点删除成功'
                          
                            );

                          
                          
                            24
                          
                          
                                }

                          
                          
                            25
                          
                           })
                        

于是有了第一版本代码 。

                          
                             1
                          
                           const zookeeper = require('node-zookeeper-client'
                          
                            );

                          
                          
                             2
                          
                          
                             3
                          
                          
                             4
                          
                          
                            //
                          
                          
                             ZK基础配置信息,正式项目需要从环境文件导入
                          
                          
                             5
                          
                           export const ZK =
                          
                             {

                          
                          
                             6
                          
                               clientAddress: 'localhost:2181/zk/test', 
                          
                            //
                          
                          
                             ZK地址
                          
                          
                             7
                          
                               servicePath: '/test-service', 
                          
                            //
                          
                          
                             服务路径
                          
                          
                             8
                          
                          
                            };

                          
                          
                             9
                          
                          
                            10
                          
                           let zkClient = 
                          
                            null
                          
                          
                            ;

                          
                          
                            11
                          
                          
                            12
                          
                          
                            //
                          
                          
                             获取服务ip+port
                          
                          
                            13
                          
                           export const getZKServiceBaseUrl = (servicePath) =>
                          
                             {

                          
                          
                            14
                          
                          
                            return
                          
                          
                            new
                          
                           Promise((resolve, reject) =>
                          
                             {

                          
                          
                            15
                          
                          
                            try
                          
                          
                             {

                          
                          
                            16
                          
                          
                            //
                          
                          
                             防止重复连接
                          
                          
                            17
                          
                          
                            if
                          
                          
                             (zkClient) {

                          
                          
                            18
                          
                          
                                            disconnectZKService();

                          
                          
                            19
                          
                          
                                        }

                          
                          
                            20
                          
                          
                            21
                          
                          
                            //
                          
                          
                             新建连接
                          
                          
                            22
                          
                                       zkClient =
                          
                             zookeeper.createClient(ZK.clientAddress);

                          
                          
                            23
                          
                          
                            //
                          
                          
                             连接后执行一次
                          
                          
                            24
                          
                                       zkClient.once('connected', async 
                          
                            function
                          
                          
                             () {

                          
                          
                            25
                          
                          
                            //
                          
                          
                             获取服务节点信息
                          
                          
                            26
                          
                                           const res =
                          
                             await listChildren(zkClient, servicePath);

                          
                          
                            27
                          
                                           res.message ?
                          
                             reject(res) : resolve(res);

                          
                          
                            28
                          
                          
                                        });

                          
                          
                            29
                          
                          
                            30
                          
                          
                                        zkClient.connect();

                          
                          
                            31
                          
                                   } 
                          
                            catch
                          
                          
                             (error) {

                          
                          
                            32
                          
                          
                                        reject(error);

                          
                          
                            33
                          
                          
                                    }

                          
                          
                            34
                          
                          
                                });

                          
                          
                            35
                          
                          
                            };

                          
                          
                            36
                          
                          
                            37
                          
                          
                            //
                          
                          
                             断开链接
                          
                          
                            38
                          
                           export const disconnectZKService = () =>
                          
                             {

                          
                          
                            39
                          
                          
                            if
                          
                          
                             (zkClient) {

                          
                          
                            40
                          
                          
                                    zkClient.close();

                          
                          
                            41
                          
                          
                                }

                          
                          
                            42
                          
                          
                            };

                          
                          
                            43
                          
                          
                            44
                          
                          
                            //
                          
                          
                             获取节点信息,ip+port
                          
                          
                            45
                          
                          
                            function
                          
                          
                             listChildren(client, path) {

                          
                          
                            46
                          
                          
                            return
                          
                          
                            new
                          
                           Promise((resolve, reject) =>
                          
                             {

                          
                          
                            47
                          
                          
                                    client.getChildren(path,

                          
                          
                            48
                          
                          
                            function
                          
                          
                             () {},

                          
                          
                            49
                          
                          
                            function
                          
                          
                             (error, children) {

                          
                          
                            50
                          
                          
                            if
                          
                          
                             (error) {

                          
                          
                            51
                          
                          
                                                reject({

                          
                          
                            52
                          
                          
                                                    ...error,

                          
                          
                            53
                          
                          
                                                    message: `获取ZK节点error,Path: ${path}`

                          
                          
                            54
                          
                          
                                                });

                          
                          
                            55
                          
                          
                                            }

                          
                          
                            56
                          
                          
                            try
                          
                          
                             {

                          
                          
                            57
                          
                                               let addressPath = path + '/'
                          
                            ;

                          
                          
                            58
                          
                          
                            if
                          
                           (children.length > 1
                          
                            ) {

                          
                          
                            59
                          
                          
                            //
                          
                          
                            若存在多个地址,则随机获取一个地址
                          
                          
                            60
                          
                                                   addressPath += children[Math.floor(Math.random() *
                          
                             children.length)];

                          
                          
                            61
                          
                                               } 
                          
                            else
                          
                          
                             {

                          
                          
                            62
                          
                          
                            //
                          
                          
                            若只有唯一地址,则获取该地址
                          
                          
                            63
                          
                                                   addressPath += children[0
                          
                            ];

                          
                          
                            64
                          
                          
                                                }

                          
                          
                            65
                          
                          
                            //
                          
                          
                            获取服务地址
                          
                          
                            66
                          
                                               client.getData(addressPath, 
                          
                            function
                          
                          
                             (err, data) {

                          
                          
                            67
                          
                          
                            if
                          
                          
                             (err) {

                          
                          
                            68
                          
                          
                                                        reject({

                          
                          
                            69
                          
                          
                                                            ...error,

                          
                          
                            70
                          
                          
                                                            message: `获取ZK服务地址error,Stack: ${err.stack}`

                          
                          
                            71
                          
                          
                                                        });

                          
                          
                            72
                          
                          
                                                    }

                          
                          
                            73
                          
                          
                            if
                          
                           (!
                          
                            data) {

                          
                          
                            74
                          
                          
                                                        reject({

                          
                          
                            75
                          
                          
                                                            ...error,

                          
                          
                            76
                          
                          
                                                            message: `ZK data is not exist`

                          
                          
                            77
                          
                          
                                                        });

                          
                          
                            78
                          
                          
                                                    }

                          
                          
                            79
                          
                                                   const serviceInfo =
                          
                             JSON.parse(data);

                          
                          
                            80
                          
                          
                            81
                          
                                                   const url = serviceInfo.address + ':' +
                          
                             serviceInfo.port;

                          
                          
                            82
                          
                          
                                                    resolve(url);

                          
                          
                            83
                          
                          
                                                });

                          
                          
                            84
                          
                                           } 
                          
                            catch
                          
                          
                             (error) {

                          
                          
                            85
                          
                          
                                                reject({

                          
                          
                            86
                          
                          
                                                    ...error,

                          
                          
                            87
                          
                          
                                                    message: `list ZK children error`

                          
                          
                            88
                          
                          
                                                });

                          
                          
                            89
                          
                          
                                            }

                          
                          
                            90
                          
                          
                                        }

                          
                          
                            91
                          
                          
                                    );

                          
                          
                            92
                          
                          
                                });

                          
                          
                            93
                          
                           }
                        

通过测试代码,可以实现调用Java服务。可能一般的程序员实现功能了就好了,可是作为一个有点追求的,感觉代码哪里有问题。具体是哪里呢,盯着屏幕瞅了两分钟,发现每次获取服务都取 ZK 注册中心获取,这个过程涉及到的网络请求而且还不是一次HTTP,如果只是这么简单的改造,程序单纯在性能响应上很有可能还不如老版本。我们可以在获取服务的真实远程地址前,添加一个本地缓存。通过ZK订阅机制,更新本地缓存数据.

思路虽然明确了,可以api扫了扫,没有我们想要的监听器,如下所示 。

  。

这怎么办,按理说的应该会有一个,节点数据改变推送的监听器,例如新增,删除,修改等等。找了半天也没找到合适的.

没办法,接着看源码吧,看了一会,忽然,看到一个似乎可用的,类 。

  。

 这不就是我需要的类吗,但是居然在一方法中注入监听器,先试试吧.

  。

 试了一下,嘿,真的可以了,当服务端节点数据发生变动后,会自动触发监听器  watcher 的回调逻辑。这就好办了,改造开始.

改进后的代码

                          const zookeeper = require('node-zookeeper-client'
                          
                            );

                          
                          
                            var
                          
                           ZK = require('../config/env.js'
                          
                            ).zk;

const client 
                          
                          =
                          
                             Object.freeze({
    zkClient: zookeeper.createClient(ZK.connectionString),
    serviceSet: [], 
                          
                          
                            //

                          
                          
                                serviceCache: Object.freeze({
        map: 
                          
                          
                            new
                          
                          
                             Map(),

        
                          
                          
                            /*
                          
                          
                            *
         * 更新缓存
         * @param {String} path 服务路径
         * @param {Array<String>} arr 真实访问集合
         
                          
                          
                            */
                          
                          
                            
        updateCache: 
                          
                          
                            function
                          
                          
                             (path, arr) {
            
                          
                          
                            this
                          
                          
                            .map.set(path, arr);
        },

        
                          
                          
                            /*
                          
                          
                            *
         * 从缓存中获取访问地址
         * 
         * @param {String} path 服务路径
         * @returns String 真实访问地址
         
                          
                          
                            */
                          
                          
                            
        getRealPath: 
                          
                          
                            function
                          
                          
                             (path) {
            let arr 
                          
                          = 
                          
                            this
                          
                          
                            .map.get(path);

            
                          
                          
                            if
                          
                           (arr.length > 1) 
                          
                            //
                          
                          
                            若存在多个地址,则随机获取一个地址
                          
                          
                            return
                          
                           arr[Math.floor(Math.random() *
                          
                             arr.length)];
            
                          
                          
                            else
                          
                          
                            //
                          
                          
                            若只有唯一地址,则获取该地址
                          
                          
                            return
                          
                           arr[0
                          
                            ];
        }
    }),

    connect: 
                          
                          
                            function
                          
                          
                             () {
        console.info(
                          
                          "连接 zookeeper"
                          
                            );

        
                          
                          
                            this
                          
                          .zkClient.once('connected', 
                          
                            function
                          
                          
                             () {
            console.info(
                          
                          "连接成功"
                          
                            );
        });

        
                          
                          
                            this
                          
                          
                            .zkClient.connect();
    },

    getRealPath: 
                          
                          
                            function
                          
                          
                             (serviceName) {
        
                          
                          
                            return
                          
                          
                            new
                          
                           Promise(async (resolve, reject) =>
                          
                             {
            
                          
                          
                            if
                          
                           (
                          
                            this
                          
                          
                            .serviceSet.includes(serviceName)) {
                resolve(
                          
                          
                            this
                          
                          
                            .serviceCache.getRealPath(serviceName));
            } 
                          
                          
                            else
                          
                          
                             {
                
                          
                          
                            //
                          
                          
                             加载服务节点信息
                          
                          
                            this
                          
                          .loadChildren(serviceName).then(url => resolve(url)).
                          
                            catch
                          
                          (error =>
                          
                             reject(error));
            }
        });
    },

    loadChildren: 
                          
                          
                            function
                          
                          
                             (path) {
        console.info(
                          
                          "进入 loadChildren "
                          
                            );
        
                          
                          
                            return
                          
                          
                            new
                          
                           Promise((resolve, reject) =>
                          
                             {
            
                          
                          
                            this
                          
                          .zkClient.getChildren(path, (event) =>
                          
                             {
                console.info(
                          
                          " loadChildren watcher "
                          
                            , path, event);

                
                          
                          
                            this
                          
                          
                            .getChildren(event.path);
            }, (error, ids) 
                          
                          =>
                          
                             {
                console.info(
                          
                          " loadChildren callback "
                          
                            , path, error, ids);
                
                          
                          
                            if
                          
                          
                             (error) {
                    reject({
                        ...error,
                        message: `获取ZK节点error,Path: ${path}`
                    });
                } 
                          
                          
                            else
                          
                          
                             {
                    resolve(
                          
                          
                            this
                          
                          
                            .getData(path, ids));
                }
            });
        });
    },

    getChildren: 
                          
                          
                            function
                          
                          
                             (path) {
        console.info(
                          
                          "进入 getChildren "
                          
                            );
        
                          
                          
                            return
                          
                          
                            new
                          
                           Promise((resolve, reject) =>
                          
                             {
            
                          
                          
                            this
                          
                          .zkClient.getChildren(path, (error, ids) =>
                          
                             {
                console.info(
                          
                          " getChildren callback "
                          
                            , path, error, ids);
                
                          
                          
                            if
                          
                          
                             (error) {
                    reject({
                        ...error,
                        message: `获取ZK节点error,Path: ${path}`
                    });
                }

                resolve(
                          
                          
                            this
                          
                          
                            .getData(path, ids));
            });
        });
    },

    getData: 
                          
                          
                            function
                          
                          
                             (path, ids) {
        console.info(
                          
                          "进入 getData "
                          
                            );

        let pros 
                          
                          = ids.map(id => 
                          
                            new
                          
                           Promise((resolve, reject) =>
                          
                             {
            
                          
                          
                            //
                          
                          
                            获取服务地址
                          
                          
                            this
                          
                          .zkClient.getData(path + "/" + id, (error, data) =>
                          
                             {
                console.info(
                          
                          " getData callback "
                          
                            , path, id);
                
                          
                          
                            if
                          
                          
                             (error) {
                    reject({
                        ...error,
                        message: `获取ZK服务地址error,Stack: ${err.stack}`
                    });
                }
                
                          
                          
                            if
                          
                           (!
                          
                            data) {
                    reject({
                        ...error,
                        message: `ZK data is not exist`
                    });
                }

                const node 
                          
                          =
                          
                             JSON.parse(data).payload;
                const protocol 
                          
                          = node.ssl ? "https://" : "http://"
                          
                            ;

                resolve(`${protocol}${node.host}:${node.port}`);
            });
        }));

        
                          
                          
                            return
                          
                           Promise.all(pros).then(arr => 
                          
                            this
                          
                          .serviceCache.updateCache(path, arr)).then(() => 
                          
                            this
                          
                          
                            .serviceCache.getRealPath(path));
    },

    disconnect: 
                          
                          
                            function
                          
                           () { 
                          
                            //
                          
                          
                            断开连接
                          
                          
        console.info("进入 disconnect "
                          
                            )
        
                          
                          
                            if
                          
                           (
                          
                            this
                          
                          
                            .zkClient) {
            console.info(
                          
                          "执行 close"
                          
                            )
            
                          
                          
                            this
                          
                          
                            .zkClient.close();
        }
    },
});

client.connect();

module.exports 
                          
                          =
                          
                             {
    getServiceUrl: (path) 
                          
                          =>
                          
                             client.getRealPath(path),
    disconnect: () 
                          
                          =>
                          
                             client.disconnect(),
}
                          
                        

这样终于,好一点了.

未完待续(多节点的选择问题)

多节点选择策略:随机,轮转,粘性 等等,一般不同的项目使用的策略也不太一样,实例中使用的是简单随机策略,后续再进行节点选择的策略问题优化啦.

关机,收工!!! 。

  。

最后此篇关于Nodejs使用ZooKeeper做服务发现的文章就讲到这里了,如果你想了解更多关于Nodejs使用ZooKeeper做服务发现的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com