gpt4 book ai didi

node.js - 使用 PG-Promise 和 PG-Query-Stream 高效地读取、操作和插入数据

转载 作者:太空宇宙 更新时间:2023-11-04 00:00:42 28 4
gpt4 key购买 nike

我希望执行以下操作。

  1. 使用 group by 查询查询大型表以执行值汇总。
  2. 通过例程运行这些记录以添加一些附加数据
  3. 将它们高效地插入数据库。

我尝试使用 pg-query-stream 将数据作为流读取出来,然后将这些记录按批处理进行计数,例如一次 1000 个,一旦达到批量限制,就使用 pg-promise pgp.helpers.insert 插入数据。

我遇到的问题是,我不太清楚如何让流正确暂停,以便在继续之前完成插入。特别是 on.end()

我尝试过的代码如下

const { performance } = require('perf_hooks')
const QueryStream = require('pg-query-stream')

const batchInsertData = (tenant, stream, records, insertColumnSet, options = {}) => {
stream.pause()
const t0 = performance.now()
let query = tenant.db.$config.pgp.helpers.insert(records, insertColumnSet)

if (options.onConflictExpression) {
query += options.onConflictExpression
}

tenant.db.none(query)
.then(() => {
let t1 = performance.now()
console.log('Inserted ' + records.length + ' records done in ' + ((t1 - t0) / 1000) + ' (seconds).')
stream.resume()
})
.catch(error => {
throw error
})
}

module.exports = (tenant, sql, columnSet, recordMapper, options = {}) => {
try {
return new Promise((resolve, reject) => {
const query = new QueryStream(sql)

// Set options as required
options.batchSize = parseInt(options.batchSize) || 1000
options.onConflictExpression = options.onConflictExpression || null

let records = []
let batchNumber = 1
let recordCount = 0

let t0 = performance.now()
tenant.db.stream(query, (stream) => {
stream.on('data', (record) => {
const mappedRecord = recordMapper(record)
records.push(mappedRecord)
recordCount++

if (records.length === options.batchSize) {
batchInsertData(tenant, stream, records, columnSet, options)
records = []
console.log(`Batch ${batchNumber} done`)
batchNumber++
}
})
stream.on('end', () => {
// If any records are left that are not part of a batch insert here.
if (records.length !== 0) {
batchInsertData(tenant, stream, records, columnSet, options)
records = []
console.log(`Batch ${batchNumber} done`)
batchNumber++
console.log('Total Records: ' + recordCount)
let t1 = performance.now()
console.log('Duration:', ((t1 - t0) / 1000) + ' (seconds).')
} else {
console.log('Total Records: ' + recordCount)
let t1 = performance.now()
console.log('Duration:', ((t1 - t0) / 1000) + ' (seconds).')
}
})
stream.on('error', (error) => {
throw error
})
})
.then(data => {
resolve()
})
.catch(error => {
console.log('ERROR:', error)
reject(error)
})
})
} catch (err) {
throw err
}
}

我不确定我正在尝试的方法是否是最好的方法。我根据我可以找到的有关 pg-promise 和 Streams 的文档尝试了一些不同的方法,但没有任何乐趣。

非常感谢任何帮助/建议。

谢谢

保罗

尝试 2

下面是我第二次尝试根据数据导入页面使用 getNextData 和序列。努力确定如何将流挂接到其中,以便在插入之前一次只提取一批数据。

const { performance } = require('perf_hooks')
const QueryStream = require('pg-query-stream')

module.exports = (tenant, sql, columnSet, recordMapper, options = {}) => {

try {
// Set options as required
options.batchSize = parseInt(options.batchSize) || 1000
options.onConflictExpression = options.onConflictExpression || null

const query = new QueryStream(sql)

function getNextData(transaction, index) {
return new Promise(async (resolve, reject) => {
if (index < options.batchSize) {
let count = 1
await transaction.stream(query, async (stream) => {
let records = []
await tenant.db.$config.pgp.spex.stream.read.call(transaction, stream, function (streamIndex, streamData) {
stream.resume()
count++
console.log(count, streamIndex, streamData)

records.push(streamData[0])

if (records.length === options.batchSize) {
stream.pause()
resolve(records)
}
}, {readChunks: true})

})
}
resolve(null)
})
}

return tenant.db.tx('massive-insert', (transaction) => {
return transaction.sequence((index) => {
return getNextData(transaction, index)
.then((records) => {
if (records > 0) {
let query = tenant.db.$config.pgp.helpers.insert(records, columnSet)

if (options.onConflictExpression) {
query += options.onConflictExpression
}

const i0 = performance.now()
return transaction.none(query)
.then(() => {
let i1 = performance.now()
console.log('Inserted ' + records.length + ' records done in ' + ((i1 - i0) / 1000) + ' (seconds).')
})
}
})
})
})
} catch (err) {
throw err
}
}

最佳答案

我使用稍微不同的方法来完成这项工作,更专注于直接使用流,同时仍然使用 pg-promise 来处理数据库端。

const BatchStream = require('batched-stream')
const { performance } = require('perf_hooks')
const { Transform, Writable } = require('stream')

module.exports = async (tenant, sql, columnSet, recordMapper, options = {}) => {

try {
// Set options as required
options.batchSize = parseInt(options.batchSize) || 1000
options.onConflictExpression = options.onConflictExpression || null

const query = new tenant.lib.QueryStream(sql)

const stream = tenant.db.client.query(query)

return new Promise((resolve, reject) => {
// We want to process this in batches
const batch = new BatchStream({size : options.batchSize, objectMode: true, strictMode: false})

// We use a write stream to insert the batch into the database
let insertDatabase = new Writable({
objectMode: true,
write(records, encoding, callback) {
(async () => {

try {
/*
If we have a record mapper then do it here prior to inserting the records.
This way is much quicker than doing it as a transform stream below by
about 10 seconds for 100,000 records
*/
if (recordMapper) {
records = records.map(record => recordMapper(record))
}

let query = tenant.lib.pgp.helpers.insert(records, columnSet)

if (options.onConflictExpression) {
query += options.onConflictExpression
}

const i0 = performance.now()
await tenant.db.none(query)
.then(() => {
let i1 = performance.now()
console.log('Inserted ' + records.length + ' records in ' + ((i1 - i0) / 1000) + ' (seconds).')
})

} catch(e) {
return callback(e)
}

callback()
})()
}
})

// Process the stream
const t0 = performance.now()
stream
// Break it down into batches
.pipe(batch)
// Insert those batches into the database
.pipe(insertDatabase)
// Once we get here we are done :)
.on('finish', () => {
const t1 = performance.now()
console.log('Finished insert in ' + ((t1 - t0) / 1000) + ' (seconds).')
resolve()
})
.on('error', (error) => {
reject(error)
})

})
} catch (err) {
throw err
}
}

关于node.js - 使用 PG-Promise 和 PG-Query-Stream 高效地读取、操作和插入数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54831211/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com