gpt4 book ai didi

json - Logstash:将复杂的多行 JSON 从日志文件解析到 ElasticSearch

转载 作者:行者123 更新时间:2023-12-02 08:26:33 24 4
gpt4 key购买 nike

首先让我说,我已经尽可能多地尝试了这里的示例,但仍然无法正常工作。我不确定是不是因为日志文件中 JSON 的复杂性。

我希望获取示例日志条目,让 Logstash 读取它,并将 JSON 作为 JSON 发送到 ElasticSearch。

下面是(缩短的)示例:

[0m[0m16:02:08,685 INFO  [org.jboss.as.server] (ServerService Thread Pool -- 28) JBAS018559: {
"appName": "SomeApp",
"freeMemReqStartBytes": 544577648,
"freeMemReqEndBytes": 513355408,
"totalMem": 839385088,
"maxMem": 1864368128,
"anonymousUser": false,
"sessionId": "zz90g0dFQkACVao4ZZL34uAb",
"swAction": {
"clock": 0,
"clockStart": 1437766438950,
"name": "General",
"trackingMemory": false,
"trackingMemoryGcFirst": true,
"memLast": 0,
"memOrig": 0
},
"remoteHost": "127.0.0.1",
"remoteAddr": "127.0.0.1",
"requestMethod": "GET",
"mapLocalObjectCount": {
"FinanceEmployee": {
"x": 1,
"singleton": false
},
"QuoteProcessPolicyRef": {
"x": 10,
"singleton": false
},
"LocationRef": {
"x": 2,
"singleton": false
}
},
"theSqlStats": {
"lstStat": [
{
"sql": "select * FROM DUAL",
"truncated": false,
"truncatedSize": -1,
"recordCount": 1,
"foundInCache": false,
"putInCache": false,
"isUpdate": false,
"sqlFrom": "DUAL",
"usingPreparedStatement": true,
"isLoad": false,
"sw": {
"clock": 104,
"clockStart": 1437766438970,
"name": "General",
"trackingMemory": false,
"trackingMemoryGcFirst": true,
"memLast": 0,
"memOrig": 0
},
"count": 0
},
{
"sql": "select * FROM DUAL2",
"truncated": false,
"truncatedSize": -1,
"recordCount": 0,
"foundInCache": false,
"putInCache": false,
"isUpdate": false,
"sqlFrom": "DUAL2",
"usingPreparedStatement": true,
"isLoad": false,
"sw": {
"clock": 93,
"clockStart": 1437766439111,
"name": "General",
"trackingMemory": false,
"trackingMemoryGcFirst": true,
"memLast": 0,
"memOrig": 0
},
"count": 0
}
]
}
}

我尝试过的 Logstash 配置都没有用。到目前为止最接近的是:

input {
file {
codec => multiline {
pattern => '\{(.*)\}'
negate => true
what => previous
}
path => [ '/var/log/logstash.log' ]
start_position => "beginning"
sincedb_path => "/dev/null"
}
}

filter {
json {
source => message
}
}

output {
stdout { codec => rubydebug }
elasticsearch {
cluster => "logstash"
index => "logstashjson"
}
}

我也试过:

input {
file {
type => "json"
path => "/var/log/logstash.log"
codec => json #also tried json_lines
}
}

filter {
json {
source => "message"
}
}

output {
stdout { codec => rubydebug }
elasticsearch {
cluster => "logstash"
codec => "json" #also tried json_lines
index => "logstashjson"
}
}

我只想获取上面发布的 JSON 并将其“按原样”发送到 ElasticSearch,就像我对该文件执行 cURL PUT 一样。感谢您的帮助,谢谢!

更新

在 Leonid 的帮助下,这是我现在的配置:

input {
file {
codec => multiline {
pattern => "^\["
negate => true
what => previous
}
path => [ '/var/log/logstash.log' ]
start_position => "beginning"
sincedb_path => "/dev/null"
}
}

filter {
grok {
match => { "message" => "^(?<rubbish>.*?)(?<logged_json>{.*)" }
}
json {
source => "logged_json"
target => "parsed_json"
}
}

output {
stdout {
codec => rubydebug
}
elasticsearch {
cluster => "logstash"
index => "logstashjson"
}
}

最佳答案

抱歉,我还不能发表评论,所以会发布一个答案。您在 elaticsearch 配置中缺少一个 document_type,否则将如何推断?


好吧,在查看 logstash 引用资料并与@Ascalonian 密切合作后,我们得出了以下配置:

input { 
file {

# in the input you need to properly configure the multiline codec.
# You need to match the line that has the timestamp at the start,
# and then say 'everything that is NOT this line should go to the previous line'.
# the pattern may be improved to handle case when json array starts at the first
# char of the line, but it is sufficient currently

codec => multiline {
pattern => "^\["
negate => true
what => previous
max_lines => 2000
}

path => [ '/var/log/logstash.log']
start_position => "beginning"
sincedb_path => "/dev/null"
}
}

filter {

# extract the json part of the message string into a separate field
grok {
match => { "message" => "^.*?(?<logged_json>{.*)" }
}

# replace newlines in the json string since the json filter below
# can not deal with those. Also it is time to delete unwanted fields
mutate {
gsub => [ 'logged_json', '\n', '' ]
remove_field => [ "message", "@timestamp", "host", "path", "@version", "tags"]
}

# parse the json and remove the string field upon success
json {
source => "logged_json"
remove_field => [ "logged_json" ]
}
}

output {
stdout {
codec => rubydebug
}
elasticsearch {
cluster => "logstash"
index => "logstashjson"
}
}

关于json - Logstash:将复杂的多行 JSON 从日志文件解析到 ElasticSearch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31713531/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com