gpt4 book ai didi

node.js - 使用 Node.js 从 Kafka 到 Elasticsearch 消费

转载 作者:太空宇宙 更新时间:2023-11-03 23:31:29 26 4
gpt4 key购买 nike

我知道有相当多的node.js 模块实现了获取消息并写入弹性的Kafka 消费者。但我只需要每个消息中的一些字段,而不是全部。是否有我不知道的现有解决方案?

最佳答案

问题是要求从 node.js 获取示例。 kafka-node module provides a very nice mechanism for getting a Consumer ,您可以将其与 elasticsearch-js 结合使用模块:

// configure Elasticsearch client
var elasticsearch = require('elasticsearch');
var esClient = new elasticsearch.Client({
// ... connection details ...
});
// configure Kafka Consumer
var kafka = require('kafka-node');
var Consumer = kafka.Consumer;
var client = new kafka.Client();
var consumer = new Consumer(
client,
[
// ... topics / partitions ...
],
{ autoCommit: false }
);

consumer.on('message', function(message) {
if (message.some_special_field === "drop") {
return; // skip it
}

// drop fields (you can use delete message['field1'] syntax if you need
// to parse a more dynamic structure)
delete message.field1;
delete message.field2;
delete message.field3;

esClient.index({
index: 'index-name',
type: 'type-name',
id: message.id_field, // ID will be auto generated if none/unset
body: message
}, function(err, res) {
if (err) {
throw err;
}
});
});

consumer.on('error', function(err) {
console.log(err);
});

注意:当您发送大量消息时,使用 Index API 并不是一个好的做法,因为它要求 Elasticsearch 每个操作创建一个线程,这显然是浪费的,并且最终会导致如果线程池因此耗尽,则拒绝请求。在任何批量摄取情况下,更好的解决方案是考虑使用类似 Elasticsearch Streams 的内容。 (或在其之上构建的Elasticsearch Bulk Index Stream),它构建在官方elasticsearch-js客户端之上。但是,我从未使用过这些客户端扩展,因此我真的不知道它们的工作效果如何,但使用只会替换我显示索引编制的部分。

我不相信 Node.js 方法在维护和复杂性方面实际上比下面的 Logstash 方法更好,因此我将这两种方法留在这里供引用。

<小时/>

更好的方法可能是从 Logstash 使用 Kafka,然后将其发送到 Elasticsearch。

您应该能够使用 Logstash 使用 Kafka input 直接执行此操作和 Elasticsearch output .

Logstash 管道中的每个文档都称为“事件”。 Kafka 输入假设它将接收传入的 JSON(可通过其编解码器配置),这将使用该消息中的所有字段填充单个事件。

然后,您可以删除那些您不感兴趣处理的字段,或者有条件地删除整个事件。

input {
# Receive from Kafka
kafka {
# ...
}
}

filter {
if [some_special_field] == "drop" {
drop { } # skip the entire event
}

# drop specific fields
mutate {
remove_field => [
"field1", "field2", ...
]
}
}

output {
# send to Elasticsearch
elasticsearch {
# ...
}
}

当然,您需要配置 Kafka 输入(来自第一个链接)和 Elasticsearch 输出(以及第二个链接)。

关于node.js - 使用 Node.js 从 Kafka 到 Elasticsearch 消费,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37687598/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com