gpt4 book ai didi

javascript - 使用 Mongoose 架构导入 CSV

转载 作者:太空宇宙 更新时间:2023-11-04 01:48:29 24 4
gpt4 key购买 nike

目前,我需要将一个大型 CSV 文件推送到 mongo DB 中,并且值的顺序需要确定数据库条目的键:

CSV 文件示例:

9,1557,358,286,Mutantville,4368,2358026,,M,0,0,0,1,0
9,1557,359,147,Wroogny,4853,2356061,,D,0,0,0,1,0

将其解析为数组的代码:

var fs = require("fs");

var csv = require("fast-csv");

fs.createReadStream("rank.txt")
.pipe(csv())
.on("data", function(data){
console.log(data);
})
.on("end", function(data){
console.log("Read Finished");
});

代码输出:

[ '9',
'1557',
'358',
'286',
'Mutantville',
'4368',
'2358026',
'',
'M',
'0',
'0',
'0',
'1',
'0' ]
[ '9',
'1557',
'359',
'147',
'Wroogny',
'4853',
'2356061',
'',
'D',
'0',
'0',
'0',
'1',
'0' ]

如何将数组插入到我的 mongoose 架构中以进入 mongo db?

架构:

var mongoose = require("mongoose");


var rankSchema = new mongoose.Schema({
serverid: Number,
resetid: Number,
rank: Number,
number: Number,
name: String,
land: Number,
networth: Number,
tag: String,
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});

module.exports = mongoose.model("Rank", rankSchema);

数组的顺序需要与模式的顺序匹配,例如在数组中,第一个数字 9 需要始终保存为键“serverid”等。我正在使用 Node.JS

最佳答案

您可以通过获取 headers 使用 fast-csv 来完成此操作来自模式定义,它将返回解析的行作为“对象”。实际上,您有一些不匹配的地方,因此我已对它们进行了更正标记:

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String, // <-- You have this as Number but it's a string
networth: Number,
tag: String,
stuff: String, // the empty field in the csv
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

try {
const conn = await mongoose.connect(uri);

await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);

console.log(headers);

await new Promise((resolve,reject) => {

let buffer = [],
counter = 0;

let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
.on("error", reject)
.on("data", async doc => {
stream.pause();
buffer.push(doc);
counter++;
log(doc);
try {
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
stream.destroy(e);
}

stream.resume();

})
.on("end", async () => {
try {
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
resolve();
}
} catch(e) {
stream.destroy(e);
}
});

});


} catch(e) {
console.error(e)
} finally {
process.exit()
}


})()

只要架构实际上与提供的 CSV 一致,就可以了。这些是我可以看到的更正,但如果您需要以不同方式对齐实际的字段名称,那么您需要进行调整。但基本上有一个 Number在有 String 的位置本质上是一个额外的字段,我认为它是 CSV 中的空白字段。

一般情况是从架构中获取字段名称数组,并在创建 csv 解析器实例时将其传递到选项中:

let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))

一旦你真正这样做了,你就会得到一个“对象”而不是一个数组:

{
"serverid": "9",
"resetid": "1557",
"rank": "358",
"name": "286",
"land": "Mutantville",
"networth": "4368",
"tag": "2358026",
"stuff": "",
"gov": "M",
"gdi": "0",
"protection": "0",
"vacation": "0",
"alive": "1",
"deleted": "0"
}

不用担心“类型”,因为 Mongoose 会根据模式转换值。

剩下的事情发生在 data 的处理程序中事件。为了获得最大效率,我们使用 insertMany() 每 10,000 行才写入数据库一次。它实际上如何到达服务器并进行处理取决于 MongoDB 版本,但根据您为单个集合导入的平均字段数(在内存使用和编写合理的网络请求方面的“权衡”),10,000 个应该是相当合理的。如有必要,请减小数字。

重要的部分是将这些调用标记为 async功能和await insertMany() 的结果在继续之前。我们还需要 pause() 流和 resume() 在每个项目上,否则我们将面临覆盖 buffer 的风险在实际发送之前插入的文档数量。 pause() resume() 有必要在管道上施加“背压”,否则元素会不断“出来”并触发 data事件。

当然,对 10,000 个条目的控制要求我们在每次迭代和流完成时进行检查,以便清空缓冲区并将任何剩余文档发送到服务器。

这确实是您想要做的,因为您当然不想在 data 的“每次”迭代中都向服务器发出异步请求。事件或基本上无需等待每个请求完成。您可以不检查“非常小的文件”,但对于任何现实世界的负载,由于尚未完成的“飞行中”异步调用,您肯定会超出调用堆栈。

<小时/>

仅供引用 - a package.json用过的。 mz 是可选的,因为它只是一个现代化的 Promise我只是习惯使用标准 Node “内置”库的启用库。该代码当然可以与fs完全互换。模块。

{
"description": "",
"main": "index.js",
"dependencies": {
"fast-csv": "^2.4.1",
"mongoose": "^5.1.1",
"mz": "^2.7.0"
},
"keywords": [],
"author": "",
"license": "ISC"
}
<小时/>

实际上,使用 Node v8.9.x 及更高版本,我们甚至可以通过 AsyncIterator 的实现使这变得更简单。通过 stream-to-iterator 模块。仍然在 Iterator<Promise<T>>模式,但应该在 Node v10.x 成为稳定的 LTS 之前这样做:

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String,
networth: Number,
tag: String,
stuff: String, // the empty field
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

try {
const conn = await mongoose.connect(uri);

await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);

//console.log(headers);

let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }));

const iterator = await streamToIterator(stream).init();

let buffer = [],
counter = 0;

for ( let docPromise of iterator ) {
let doc = await docPromise;
buffer.push(doc);
counter++;

if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
}

if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}

} catch(e) {
console.error(e)
} finally {
process.exit()
}

})()

基本上,所有流“事件”处理以及暂停和恢复都被一个简单的 for 所取代。循环:

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
let doc = await docPromise;
// ... The things in the loop
}

简单!这会在以后的 Node 实现中通过 for..await..of 进行清理。当它变得更加稳定时。但上面的代码在指定版本及以上版本上运行良好。

关于javascript - 使用 Mongoose 架构导入 CSV,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50560719/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com