gpt4 book ai didi

mysql - Node MySQL 池集群事件发射器不工作

转载 作者:行者123 更新时间:2023-11-29 18:48:33 25 4
gpt4 key购买 nike

documentation给出了为池注册连接事件发射器的示例:

pool.on('connection', function (connection) {
connection.query('SET SESSION auto_increment_increment=1');
});

文档展示了如何从集群获取池:

var pool = poolCluster.of('SLAVE*', 'RANDOM');
pool.getConnection(function (err, connection) {});
pool.getConnection(function (err, connection) {});
pool.query(function (error, results, fields) {});

但是,var pool = poolCluster.of('SLAVE*', 'RANDOM'); 后跟 pool.on( ... ) 错误,且 pool.on 不是一个函数

尝试通过集群注册.on('connection'),执行时不会出错,但没有效果。

重现代码:

var mysql = require('mysql');

var mysql_pool_cluster = mysql.createPoolCluster();
mysql_pool_cluster.add('myPool', {user: 'root', password: 'password'});
mysql_pool_cluster.on('connection', function(new_conn) {
console.log('hello from cluster event emitter');
new_conn.release();
});

var pool = mysql_pool_cluster.of('myPool', 'ORDER');
try {
pool.on('connection', function(new_conn) {
console.log('hello from pool event emitter');
new_conn.release();
});
} catch(err) {
console.error(err);
}

console.log('');

pool.getConnection(function(err, conn) {
if (err) {
console.error(err);
} else {
console.log('hello from new connection');
conn.release();

mysql_pool_cluster.end(function(err) {
if (err) {
console.error(err);
}
});
}
});

以上代码的输出:

TypeError: pool.on is not a function
at Object.<anonymous> (E:\scratch\scratch_server.js:14:7)
at Module._compile (module.js:570:32)
at Object.Module._extensions..js (module.js:579:10)
at Module.load (module.js:487:32)
at tryModuleLoad (module.js:446:12)
at Function.Module._load (module.js:438:3)
at Module.runMain (module.js:604:10)
at run (bootstrap_node.js:394:7)
at startup (bootstrap_node.js:149:9)
at bootstrap_node.js:509:3

hello from new connection

如您所见,pool.on('connection') 无法执行,并且 cluster.on('connection') 在运行时不会发出第一次.getConnection()

最佳答案

我认为集群库不符合规范,因此我编写了自己的集群类:

var mysql = require('mysql');
var Promise = require('promise');

var deepCopy = function(obj) {
// https://stackoverflow.com/a/15040626
return JSON.parse(JSON.stringify(obj));
};
var logger = {log: console.log, error: console.error};

class mysqlPoolCluster {
// custom class to work around event emitter bug in mysql.createPoolCluster()
// https://stackoverflow.com/q/44466894

constructor() {
this._pool_dict = {};
this._future_pools_on_events_dict = {};
}

mergeAndSplitConfs(segregated_confs_dict) {
/*
Converts this object --
{
pools: {
admin: {connectionLimit: 1, user: 'my_admin', password: 'password'},
read: {user: 'my_reader', password: 'password'},
write: {user: 'my_writer', password: 'password'},
read_write: {user: 'my_reader_writer', password: 'password'}
},
host: 'localhost',
database: 'my_db',
connectionLimit: 2
}

to this object --
{
admin: {
connectionLimit: 1,
user: 'my_admin',
password: 'password',
host: 'localhost',
database: 'my_db'
},
read: {
connectionLimit: 2,
user: 'my_reader',
password: 'password',
host: 'localhost',
database: 'my_db'
},
write: {
connectionLimit: 2,
user: 'my_writer',
password: 'password',
host: 'localhost',
database: 'my_db'
},
read_write: {
connectionLimit: 2,
user: 'my_reader_writer',
password: 'password',
host: 'localhost',
database: 'my_db'
}
}
*/

var pools_dict = deepCopy(segregated_confs_dict.pools);
if (!pools_dict) {
throw new Error("arg does not have property 'pools'");
}

var base_conf = deepCopy(segregated_confs_dict);
delete base_conf.pools;

var base_keys = Object.keys(base_conf);
var pool_names = Object.keys(pools_dict);
for(var i_pool_name=0; i_pool_name < pool_names.length; i_pool_name++) {
var pool_conf = pools_dict[pool_names[i_pool_name]];
for(var i_base_key=0; i_base_key < base_keys.length; i_base_key++) {
var base_key = base_keys[i_base_key];
if (!pool_conf.hasOwnProperty(base_key)) {
pool_conf[base_key] = deepCopy(base_conf[base_key]);
}
}
}

return pools_dict;
}

populatePools(confs_dict) {
// 'confs_dict' is the return from this.mergeAndSplitConfs()
var names = Object.keys(confs_dict);
try {
for(var i_name=0; i_name < names.length; i_name++) {
var name = names[i_name];
this.createAndAddPool(name, confs_dict[name]);
}
} catch(err) {
this.endClusterAndRemovePoolsPromiser()
.catch(logger.error);

throw err;
}
}

createAndAddPool(name, conf) {
if (this._pool_dict.hasOwnProperty(name)) {
throw new Error("pool '" + name + "' already exists");
}

this._pool_dict[name] = mysql.createPool(conf);

try {
this.getPool(name).on('connection', function(conn) {
conn.queryPromiser = function(sql, args) {
return new Promise(function(resolve, reject) {
conn.query(
sql,
args,
function(err, results, fields) {
if (err) {
reject(err);
} else {
resolve( {"results": results, "fields": fields} );
}
}
);
});
};
});

var that = this;

this.getPool(name).queryPromiser = function(sql, args) {
return new Promise(function(resolve, reject) {
that.getPool(name).query(
sql,
args,
function(err, results, fields) {
if (err) {
reject(err);
} else {
resolve( {"results": results, "fields": fields} );
}
}
);
});
};

this.getPool(name).getConnectionPromiser = function() {
return new Promise(function(resolve, reject) {
that.getPool(name).getConnection(
function(err, conn) {
if (err) {
reject(err);
} else {
resolve(conn);
// remember to call conn.release() when you're finished with the conn
}
}
);
});
};

var events = Object.keys(this._future_pools_on_events_dict);
for(var i_event=0; i_event < events.length; i_event++) {
var event = events[i_event];
for(var i_cb=0; i_cb < this._future_pools_on_events_dict[event].length; i_cb++) {
this.getPool(name).on(event, this._future_pools_on_events_dict[event][i_cb]);
}
}

return this.getPool(name);
} catch(err) {
this.endAndRemovePoolPromiser(name)
.catch(logger.error);

throw err;
}
}

getPool(name) {
if (this._pool_dict.hasOwnProperty(name)) {
return this._pool_dict[name];
} else {
throw new Error("pool '" + name + "' does not exist");
}
}

endAndRemovePoolPromiser(name) {
var that = this;
return new Promise(function(resolve, reject) {
that.getPool(name).end(function(err) {
delete that._pool_dict[name];

if (err) {
reject(err);
} else {
resolve();
}
});
});
}

endClusterAndRemovePoolsPromiser() {
var end_promises = [];
var err_list = [];
var names = Object.keys(this._pool_dict);
for(var i_name=0; i_name < names.length; i_name++) {
end_promises.push(
this.endAndRemovePoolPromiser(names[i_name])
.catch(function(err) {
err_list.push(err);
})
);
}

return Promise.all(end_promises)
.then(function() {
if (err_list.length) {
return Promise.reject(err_list);
}
});
}

on(event, cb) {
var names = Object.keys(this._pool_dict);
for(var i_name=0; i_name < names.length; i_name++) {
this.getPool(names[i_name]).on(event, cb);
}

if (!this._future_pools_on_events_dict.hasOwnProperty(event)) {
this._future_pools_on_events_dict[event] = [];
}
this._future_pools_on_events_dict[event].push(cb);
}
}

关于mysql - Node MySQL 池集群事件发射器不工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44466894/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com