gpt4 book ai didi

node.js - HTTP POST Elasticsearch 事件流批量

转载 作者:行者123 更新时间:2023-12-03 02:05:00 25 4
gpt4 key购买 nike

我有一个 node.js 程序,它使用流来读取文件( nodejs event stream setting a variable per stream )

我想使用相同的程序将此数据写入 Elasticsearch 。我写了一个小写函数

var writeFunction = function(data) {
//console.log(data);
var client = request.newClient("http://localhost:9200");
client.post('/newtest3/1',data,function(err,res,body) {
return console.log(res.statusCode);
});
};

并将其与流媒体联系起来
var processMyFile = function(file) {
var stream = getStream(file);
var nodeName = stream.nodeName;
stream
.pipe(es.split())
.on('end',endFunction)
.pipe(es.map(function(data,cb) {
processFunction(nodeName,data,cb);
}))
.pipe(es.map(function(data,cb) {
writeFunction(data);
}));

}

上述工作按预期异步工作并写入数据,除了它需要很长时间。它似乎也可以作为缓冲区工作,因为写入需要比读取更长的时间。(使用管道的优势)
我知道 Elasticsearch 中有一个批量接口(interface),我可以使用它导入。 Kibana 入门指南 ( http://www.elasticsearch.org/guide/en/kibana/current/using-kibana-for-the-first-time.html) 中的shakesphere.json 示例

这意味着我需要以批量导入所需的格式创建文件,然后运行 ​​curl 程序等。我想避免创建临时文件。

作为流式传输过程的一部分,是否有更简单的方法可以更快地将数据导入 Elasticsearch

最佳答案

elasticsearch-streams将帮助您使用流式传输的批量接口(interface),而无需先编写 json 文件。

我相信您的代码或多或少是这样的:

var TransformToBulk = require('elasticsearch-streams').TransformToBulk
var WritableBulk = require('elasticsearch-streams').WritableBulk;
var client = new require('elasticsearch').Client();

var bulkExec = function(bulkCmds, callback) {
client.bulk({
index : 'newtest3',
type : '1',
body : bulkCmds
}, callback);
};

var ws = new WritableBulk(bulkExec);
var toBulk = new TransformToBulk(function getIndexTypeId(doc) { return { _id: doc.id }; });

var processMyFile = function(file) {
var stream = getStream(file);

stream
.pipe(toBulk)
.pipe(ws)
.on('close', endFunction)
.on('err', endFunction);
}

关于node.js - HTTP POST Elasticsearch 事件流批量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27284363/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com