gpt4 book ai didi

node.js - 如何使用 createWriteStream 将 JSON 流式传输到 BigQuery 表?

转载 作者:行者123 更新时间:2023-12-03 18:18:15 26 4
gpt4 key购买 nike

我有一个很大的 JSON 文件,我想对其稍作转换并作为新表发送到 Google BigQuery。我过去在 node.js 中使用过流效果很好,我认为这是解决这个问题的一个不错的解决方案。我正在使用 official Google node.js BigQuery API 。我可以毫无问题地创建一个具有正确架构的表。我想我已经准备好了解决方案。程序完成得很好,但最终没有数据登陆我的 BigQuery 表。

相关代码如下

我的 node.js 流代码:

fs.createReadStream('testfile.json')
.pipe(require('split')())
.pipe(require('event-stream').mapSync((data) => {
if (data.length > 1) {
let obj;
try {
obj = JSON.parse('{' + (data[data.length - 1] === ',' ? data.slice(0, data.length - 1) : data) + '}');
} catch (e) {
console.error('error parsing!', e, data);
}
let user = Object.keys(obj)[0];
let company = obj[user][0];
let item = {
user: user,
company: company
};
console.log(item);
return JSON.stringify(item);
}
}))
.pipe(table.createWriteStream('json'))
.on('error', (e) => {
console.error('Error!', e);
})
.on('complete', (job) => {
console.log('All done!', job);
});
testfile.json 看起来像这样:
{
"a":["a company", "1234567"],
"b":["b company", "1234214"],
"c":["c company", "12332231"]
}

当我运行程序时,输出如下所示:
{ user: 'a', company: 'a company' }
{ user: 'b', company: 'b company' }
{ user: 'c', company: 'c company' }
All done! Job {
metadata:
{ kind: 'bigquery#job',
/* lots more data here */

docs for createWriteStream 并没有非常详细地说明数据应该采用什么格式以泵入写入流,所以我觉得我有点盲目。

最佳答案

找出我需要做什么来 a) 使导入工作和 b) 更清楚地了解正在发生的事情。

修复导入

  • 指定您将向 createWriteStream 提供一个以换行符分隔的 JSON 文件。 :
    let firehose = table.createWriteStream({
    sourceFormat: 'NEWLINE_DELIMITED_JSON'
    });


  • 确保 JSON 转换器返回换行符分隔的 JSON:
    return JSON.stringify(item) + '\n';

  • 流和作业状态的可见性

    消防水管 writeStreamerrorcomplete您可以订阅的事件,但表的 writeStreamcomplete event 提供了一个 Job 作为参数,它本身有更多的事件你可以订阅以获得更多的洞察力。
    let moment = require('moment');
    firehose.on('error', (e) => {
    console.error('firehose error!', e);
    });
    firehose.on('complete', (job) => {
    console.log('Firehose into BigQuery emptied! BigQuery Job details:', job.metadata.status.state, job.metadata.jobReference.jobId);
    console.log('Now we wait for the Job to finish...');
    job.on('complete', (job) => {
    console.log('BigQuery Job loaded', job.statistics.load.inputFileBytes, 'bytes yielding', job.statistics.load.outputRows, 'rows and', job.statistics.load.badRecords, 'bad records in', moment(parseInt(job.statistics.endTime)).from(moment(parseInt(job.statistics.startTime)), true));
    });
    job.on('error', (e) => { console.error('Job error', e); });
    });

    关于node.js - 如何使用 createWriteStream 将 JSON 流式传输到 BigQuery 表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49097825/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com