gpt4 book ai didi

javascript - 将数据附加/插入到 MongoDB 中

转载 作者:太空宇宙 更新时间:2023-11-04 02:12:46 26 4
gpt4 key购买 nike

各位 stackoverflow 用户,大家好,我目前面临一个有关 mongodb 和 nodejs 的问题。我正在尝试将数据插入到 mongodb 中,并且我已准备好所有数据以插入到 mongodb 中。我用来插入的数据是csv类型文件。

这是我的项目。我的项目中有一个客户端和服务器。服务器将从 CSV 文件中获取数据,监听客户端并将数组内存空间中的 CSV 文件中的数据发送到客户端。客户端将连接到服务器,请求数据并将数据附加到 mongodb。

client.js 中的以下代码很可能会帮助我使用它的值将其附加到 mongodb 中。有人可以帮我吗?

monitoredItem.on("changed",function(dataValue){
console.log(" New Data Receive = ",dataValue.value.value);
});

不确定如何将数据追加到 mongodb 中。

这是我的client.js

/*global require,console,setTimeout */
var opcua = require("node-opcua");
var async = require("async");
var fs = require("fs");
var csv = require("fast-csv");
var sleep = require("system-sleep");

var client = new opcua.OPCUAClient();
var endpointUrl = "opc.tcp://" + require("os").hostname() + ":4334/UA/MyLittleServer";


var the_session, the_subscription;

async.series([

// step 1 : connect to
function(callback) {
client.connect(endpointUrl,function (err) {
if(err) {
console.log(" cannot connect to endpoint :" , endpointUrl );
} else {
console.log("connected !");
console.log("Endpoint URL ", endpointUrl);
}
callback(err);
});
},

// step 2 : createSession
function(callback) {
client.createSession( function(err,session) {
if(!err) {
the_session = session;
}
callback(err);
});
},

// step 3 : browse
function(callback) {
the_session.browse("RootFolder", function(err,browse_result){
if(!err) {
browse_result[0].references.forEach(function(reference) {
console.log( reference.browseName.toString());
});
}
callback(err);
});
},

// step 4 : read a variable with readVariableValue
//function(callback) {
// the_session.readVariableValue("ns=2000;s=TEST", function(err,dataValue) {
// if (!err) {
// console.log(" free mem % = " , dataValue.toString());
// }
// callback(err);
// });
//},

// step 4' : read a variable with read
//function(callback) {
// var max_age = 0;
// var nodes_to_read = [
// { nodeId: "ns=2000;s=TEST", attributeId: opcua.AttributeIds.Value }
// ];
// the_session.read(nodes_to_read, max_age, function(err,nodes_to_read,dataValues) {
// if (!err) {
// console.log(" free mem % = " , dataValues[0]);
// }
// callback(err);
// });
//},


// function(callback){
// the_session.readVariableValue("ns=74;s=Dou", function(err,dataValue) {
// if(!err){
// console.log("Test Success", dataValue.toString());
// }
// callback(err);
// });
// },
//
// function(callback){
// the_session.readVariableValue("ns=74;s=Float", function(err,dataValue) {
// if(!err){
// console.log("Test Success", dataValue.toString());
// }
// callback(err);
// });
// },
//
// function(callback){
// the_session.readVariableValue("ns=74;s=String", function(err,dataValue) {
// if(!err){
// console.log("Test Success", dataValue.toString());
// }
// callback(err);
// });
// },

// function(callback){
// the_session.readVariableValue("ns=1;s=CSV", function(err, dataValue) {
// if(!err){
// console.log(dataValue.toString());
// sleep(5000);
// }
// callback(err);
// });
// },

// function(callback){
// the_session.readVariableValue("ns=1;s=CSV", function(err, dataValue) {
// if(!err){
// fs.createReadStream(dataValue.toString())
// console.log(dataValue.toString());
// sleep(5000);
// .pipe(csv())
// .on('data', function(data){
// console.log(csv);
// sleep(5000);
// })
// .op('end', function(data){
// console.log("Read Finish")
// });
// }
// callback(err);
// });
// },





// step 5: install a subscription and install a monitored item for 10 seconds
function(callback) {

the_subscription=new opcua.ClientSubscription(the_session,{
requestedPublishingInterval: 1000,
requestedLifetimeCount: 10,
requestedMaxKeepAliveCount: 2,
maxNotificationsPerPublish: 10,
publishingEnabled: true,
priority: 10
});

the_subscription.on("started",function(){
console.log("subscription started for 2 seconds - subscriptionId=",the_subscription.subscriptionId);
}).on("keepalive",function(){
console.log("keepalive");
}).on("terminated",function(){
callback();
});

setTimeout(function(){
the_subscription.terminate();
},10000000);

// install monitored item
var monitoredItem = the_subscription.monitor({
nodeId: opcua.resolveNodeId("ns=2000;s=TEST"),
attributeId: opcua.AttributeIds.Value
},
{
samplingInterval: 100,
discardOldest: true,
queueSize: 10
},
opcua.read_service.TimestampsToReturn.Both
);
console.log("-------------------------------------");

monitoredItem.on("changed",function(dataValue){
console.log(" New Data Receive = ",dataValue.value.value);
});
},

// close session
function(callback) {
the_session.close(function(err){
if(err) {
console.log("session closed failed ?");
}
callback();
});
}

],
function(err) {
if (err) {
console.log(" failure ",err);
} else {
console.log("done!");
}
client.disconnect(function(){});
}) ;

这是我的server.js

/*global require,setInterval,console */
var opcua = require("node-opcua");
var fs = require("fs");
var csv = require("fast-csv");
var sleep = require("system-sleep");

var currentCount = 0;
var array = ["No New Data"];

fs.createReadStream('1000data.csv')
.pipe(csv())
.on('data', function(data){
array.push(JSON.stringify(data));
//sleep(5);
})
.on('end', function(data) {
});


// Let's create an instance of OPCUAServer
var server = new opcua.OPCUAServer({
port: 4334, // the port of the listening socket of the server
resourcePath: "UA/MyLittleServer", // this path will be added to the endpoint resource name
buildInfo : { //Information of the build, Retrieve the server information for the current instance of the db client
productName: "MySampleServer1",
buildNumber: "7658",
buildDate: new Date(2014,5,2)
}
});



function post_initialize() {
console.log("initialized");
function construct_my_address_space(server) {

var addressSpace = server.engine.addressSpace;

// declare a new object
var device = addressSpace.addObject({
organizedBy: addressSpace.rootFolder.objects,
browseName: "MyDevice"
});

// add some variables
// add a variable named MyVariable1 to the newly created folder "MyDevice"
var variable1 = 1;

// emulate variable1 changing every 500 ms
setInterval(function(){ variable1+=1; }, 1000);

addressSpace.addVariable({
componentOf: device,
browseName: "MyVariable1",
dataType: "Double",
value: {
get: function () {
return new opcua.Variant({dataType: opcua.DataType.Double, value: variable1 });
}
}
});




// var variableTest = "Test";

// addressSpace.addVariable({
// componentOf: device,
// nodeId: "ns=1;b=1020FFAA",
// browseName: "MyVariableTest",
// dataType: "String",
// value: {
// get: function () {
// return new opcua.Variant({dataType: opcua.DataType.String, value: variableTest });
// },
// set: function (variant) {
// variable2 = parseFloat(variant.value);
// return opcua.StatusCodes.Good;
// }
// }
// });

// server.jsonVar = server.engine.addVariable("MyDevice", {
// browseName: "JSONObject",
// dataType: "String",
// value: {
// get:function(){
// return new opcua.Variant({
// dataType: opcua.DataType.String,
// value: get_json_string()
// });
// }
// }
// });

// var varTestDou = addressSpace.addVariable({
// componentOf: device,
// nodeId: "ns=74;s=Dou",
// browseName : "VarD",
// dataType: "Double",
// value: new opcua.Variant({dataType: opcua.DataType.Double, value: [10.0,292.31,412.345,185,3453.245]})

// });

// var varTestFloat = addressSpace.addVariable({
// componentOf: device,
// nodeId: "ns=74;s=Float",
// browseName : "VarF",
// dataType: "Float",
// value: new opcua.Variant({dataType: opcua.DataType.Float, value: [10.0,402.23,123,34,643,34]})

// });

// var varTestString = addressSpace.addVariable({
// componentOf: device,
// nodeId: "ns=74;s=String",
// browseName : "VarT",
// dataType: "String",
// value: new opcua.Variant({dataType: opcua.DataType.String, value: "001,41,54,87,23,12/3/2016,8:39am"})

// });

var csvFile = addressSpace.addVariable({
componentOf: device,
nodeId: "ns=1;s=CSV",
browseName: "csvData",
dataType: "String",
value: new opcua.Variant({dataType: opcua.DataType.String, value: array[variable1]})
});




// fs.createReadStream('1000data.csv')
// .pipe(csv())
// .on('data', function(data){
// console.log("Data uploaded");
// })
// .on('end', function(date) {
// console.log("Read Finish");
// });




//look at this if you want to send data from client to server
// add a variable named MyVariable2 to the newly created folder "MyDevice"
var variable2 = 10.0;

server.engine.addressSpace.addVariable({

componentOf: device,

nodeId: "ns=2000;b=1020FFAA", // some opaque NodeId in namespace 4

browseName: "MyVariable2",

dataType: "Double",

value: {
get: function () {
return new opcua.Variant({dataType: opcua.DataType.Double, value: variable2 });
},
set: function (variant) {
variable2 = parseFloat(variant.value);
return opcua.StatusCodes.Good;
}
}
});


var os = require("os");
/**
* returns the percentage of free memory on the running machine
* @return {double}
*/
function available_memory() {
if (currentCount < array.length-1){
currentCount += 1 ;
console.log(array[currentCount]);
return currentCount;
} else {
console.log(array[0]);
return 0;
}

}
server.engine.addressSpace.addVariable({

componentOf: device,

nodeId: "ns=2000;s=TEST", // a string nodeID
browseName: "FreeMemory",
dataType: "String",
value: {
get: function () {return new opcua.Variant({dataType: opcua.DataType.String, value: array[available_memory()] });}
}
});
}
construct_my_address_space(server);
server.start(function() {
console.log("Server is now listening ... ( press CTRL+C to stop)");
console.log("port ", server.endpoints[0].port);
var endpointUrl = server.endpoints[0].endpointDescriptions()[0].endpointUrl;
console.log(" the primary server endpoint url is ", endpointUrl );
});
}
server.initialize(post_initialize);

这是我的1000data.csv

Machine Unit,Air Temperature,Water Temperature,Heat Temperature,Room Temperature,Date,Time
1,61,54,87,24,12/3/2016,8:39AM
2,41,57,92,23,29/9/2016,3:51PM
3,39,53,89,25,22/12/2016,5:30PM

最佳答案

您可以添加一个函数来添加监控并获取在数据更改时调用的回调。您可以在其中将数据添加到 mongoDB。

该函数可能如下所示:

...

function addMonitoring(callback) {
const monitoredItem = the_subscription.monitor({
nodeId: opcua.resolveNodeId("ns=2000;s=TEST"),
attributeId: opcua.AttributeIds.Value
}, {
samplingInterval: 100,
discardOldest: true,
queueSize: 10
},
opcua.read_service.TimestampsToReturn.Both
);

monitoredItem.on("changed", dataValue => {
if (dataValue === undefined || dataValue.value === null) {
callback(`The subscription for the Opc Servernode failed.`);
} else {
callback(null, dataValue.value.value);
}
});
}

...

现在,在第 5 步,您可以添加如下内容:

import mongooseModel from 'mongooseModel';

...

setTimeout(function() {
the_subscription.terminate();
}, 10000000);

addMonitoring((error, value) => {
if (!error) {
mongooseModel.findByIdAndUpdate(1 /*your id*/ , {
$push: {
"data": value
}
})
} else {
// handle error
}
});

模型可能如下所示:

...

const schema = mongoose.Schema({
data: {
type: [Number],
required: true
}
timestamp: {
type: Date,
default: Date.now,
},
});

export default mongoose.model("TestData", schema);

...

数据库的架构取决于您要保存的数据

关于javascript - 将数据附加/插入到 MongoDB 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41459220/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com