gpt4 book ai didi

elasticsearch - Logstash:将两个日志合并为一个输出文档

转载 作者:行者123 更新时间:2023-11-29 02:48:19 24 4
gpt4 key购买 nike

我已将 syslog 设置为使用以下过滤器将日志发送到 logstash:

output {
elasticsearch
{ hosts => ["localhost:9200"]
document_id => "%{job_id}"
}

}
filter {
grok {
overwrite => ["message"]
}
json {
source => "message"
}
}

我的一个应用程序的典型消息将具有初始状态和 job_id:

{"job_id": "xyz782", state: "processing", job_type: "something"}

几分钟后,另一个日志将具有相同的log_id、不同的状态和处理时间:

{"job_id": "xyz782", state:"failed", processing_time: 12.345}

这些字段已正确加载,但创建了两个文档。我想要的是只为初始日志创建一个文档,第二个日志而不是更新第一个日志,这意味着更新的文档将具有以下字段:

{"job_id": "xyz782", state: "failed", job_type: "something", processing_time: 12.345}

正如您在我的 logstash conf 输出中看到的那样,我使用 job_id 作为文档 ID,但是,第二条消息似乎替换了第一条消息中的字段,而且还删除了第一条消息中的所有字段例如,在第二个消息中,第一个消息中出现的 job_type 字段没有出现在最终文档中。这可能与 json 两次都来自同一字段“消息”这一事实有关。是否有另一种方法可以将两个日志消息合并到 logstash 中的一个文档中?

最佳答案

您可以使用 aggregate过滤器以执行此操作。聚合过滤器支持基于公共(public)字段值将多个日志行聚合到一个事件中。在您的情况下,公共(public)字段将是 job_id 字段。

然后我们需要另一个字段来检测应该聚合的第一个事件和第二个事件。在您的情况下,这将是 state 字段。

因此,您只需向现有的 Logstash 配置添加另一个过滤器,如下所示:

filter {
...your other filters

if [state] == "processing" {
aggregate {
task_id => "%{job_id}"
}
} else if [state] == "failed" {
aggregate {
task_id => "%{job_id}"
end_of_task => true
timeout => 120
}
}
}

您可以根据作业运行的时间自由调整超时(以秒为单位)。

关于elasticsearch - Logstash:将两个日志合并为一个输出文档,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35203391/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com