- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我一直在尝试使用 logstash 解析我的 python 回溯日志。我的日志如下所示:
[pid: 26422|app: 0|req: 73/73] 192.168.1.1 () {34 vars in 592 bytes} [Wed Feb 18 13:35:55 2015] GET /data => generated 2538923 bytes in 4078 msecs (HTTP/1.1 200) 2 headers in 85 bytes (1 switches on core 0)
Traceback (most recent call last):
File "/var/www/analytics/parser.py", line 257, in parselogfile
parselogline(basedir, lne)
File "/var/www/analytics/parser.py", line 157, in parselogline
pval = understandpost(parts[3])
File "/var/www/analytics/parser.py", line 98, in understandpost
val = json.loads(dct["events"])
File "/usr/lib/python2.7/json/__init__.py", line 338, in loads
return _default_decoder.decode(s)
File "/usr/lib/python2.7/json/decoder.py", line 366, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib/python2.7/json/decoder.py", line 382, in raw_decode
obj, end = self.scan_once(s, idx)
ValueError: Unterminated string starting at: line 1 column 355 (char 354)
到目前为止,我已经能够解析日志,除了最后一行,即
ValueError: Unterminated string starting at: line 1 column 355 (char 354)
我正在使用多行过滤器来执行此操作。我的 logstash 配置看起来像这样:
filter {
multiline {
pattern => "^Traceback"
what => "previous"
}
multiline {
pattern => "^ "
what => "previous"
}
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}"
]
}
if "_grokparsefailure" in [tags] {
multiline {
pattern => "^.*$"
what => "previous"
negate => "true"
}
}
if "_grokparsefailure" in [tags] {
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}"
]
remove_tag => ["_grokparsefailure"]
}
}
}
但我的最后一行没有解析。相反,它仍然给我一个错误,并且还成倍地增加了处理时间。关于如何解析回溯的最后一行的任何建议?
最佳答案
好吧,我找到了解决办法。所以我采用的方法是忽略以“[”开头的日志消息的开头,所有其他行都将附加在上一条消息的末尾。然后可以应用 grok 过滤器并可以解析回溯。请注意,我必须应用两个 grok 过滤器:
当有traceback的时候用GREEDYDATA获取traceback。
当没有回溯时,GREEDYDATA 解析失败,我将不得不删除 _grokparsefailure 标记,然后再次应用没有 GREEDYDATA 的 grok。这是在 if block 的帮助下完成的。
最终的 logstash 过滤器看起来像这样:
filter {
multiline {
pattern => "^[^\[]"
what => "previous"
}
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}"
]
}
if "_grokparsefailure" in [tags] {
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)"
]
remove_tag => ["_grokparsefailure"]
}
}
else {
mutate {
convert => {"traceback" => "string"}
}
}
date {
match => ["timestamp", "dd/MM/YYYY:HH:MM:ss Z"]
locale => en
}
geoip {
source => "clientip"
}
useragent {
source => "agent"
target => "Useragent"
}
}
或者,如果您不想使用 if block 来检查另一个 grok 模式并删除 _grokparsefailure
,您可以使用第一个 grok 过滤器通过包含多个来检查这两种消息类型grok 过滤器的 match
数组中的消息模式检查。可以这样做:
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)",
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}"
]
}
还有第三种方法(可能是最优雅的一种)。它看起来像这样:
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)(%{GREEDYDATA:traceback})?"
]
}
请注意,在该方法中,可选的字段必须包含在“()?”中。在这里,(%{GREEDYDATA:traceback})?
因此,grok 过滤器看到该字段是否可用,就会对其进行解析。否则,它将被跳过。
关于python - 使用 logstash 解析包含 python 回溯的日志,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30977603/
我在 logstash 中使用文件作为日志的输入。我的日志文件每天轮换,所以我想问一下我们如何配置 logstash 的文件插件,以便它可以处理每天轮换的文件。除此之外,文件节拍也可以进行日志轮换。
我正在我公司服务的服务器上实现监控工具。为此,我正在使用 logstash。我们的应用程序通过 log4net udp appender 将它们的日志发送到 logstash(输入 udp),然后 l
我期待对 Logstash 中收到的输入使用数学运算,但无法看到任何此类 过滤器 . 输入如下: { "user_id": "User123", "date": "2016 Jun 26 12
我对 logstash 和 Elasticsearch 很陌生。我正在尝试将日志文件存储在 elasticsearch 和平面文件中。我知道 logstash 支持两种输出。但是它们是同时处理的吗?还
寻求一些入门帮助...我已经安装了 Logstash(以及 ElasticSearch),但我正在为我的第一个过滤器而苦苦挣扎。 作为测试,我将其配置为从包含 6 行的修剪日志文件中读取,每行以时间戳
我已经按照下面提到的架构实现了 logstash(在测试中)。 成分分解 Rsyslog 客户端:默认情况下,所有 Linux destros 中都安装了 syslog,我们只需要配置 rsyslog
我无法在 LogStash 中使用负正则表达式(如 the docs 中所述) 考虑以下正则表达式,它可以正常工作以检测已分配值的字段: if [remote_ip] =~ /(.+)/ {
我在云中使用两台服务器,在一台服务器上 (A) 我安装了 filebeat,在第二台服务器上 (B) 我安装了 logstash、elasticsearch 和 kibana。所以我在 logstas
我有一个来自 Windows 事件日志的 IP 地址字段,它在 IP 地址前面包含类似“::fffff:”的字符。我无法在此处更改源,因此我必须在 Logstash 中修复此问题。 我一定很不擅长谷歌
我正在尝试将此日期结构 YYYY-MM-DD_HH-MM-SS 转换为 logstash 中的 YYYY-MM-DD HH:MM:SS。这是我的过滤器: filter { csv {
我正在使用 Logstash(以 Kibana 作为 UI)。我想从我的日志中提取一些字段,以便我可以在 UI 的 LHS 上按它们进行过滤。 我日志中的示例行如下所示: 2013-07-04 00:
如何将此 Logstash 过滤器更改为不区分大小写? filter { if "foo" in [message] { mutate { add_field => { "Alert_le
我正在尝试将事件消息与几个正则表达式相匹配。我打算使用 grep 过滤器,但它已被弃用,所以我正在尝试使用否定的方法。 我正在寻找的功能是删除所有事件,除非消息匹配多个正则表达式。 过滤器波纹管不起作
我用过logstash的RPM安装。因此,logstash 作为 linux 服务运行。我想调试一个管道,需要查看的内容 output { stdout { codec => rubydebug
如何在 logstash 中比较日期。我想将日期与恒定日期值进行比较。以下代码在 Logstash 中失败并出现 ruby 异常。 if [start_dt] { "str_dt" => "20
我正在从logstash-1.1.3升级到logstash-1.3.3。问题是,1.1.3 中的标签和字段配置在 1.3.3 版本中已弃用。这些允许仅将那些事件发送到具有给定标签或包含给定字段的输出。
我想在同一台机器上运行两个 logstash 实例。现在我使用命令启动 logstash。logstash.bat agent -f logstashconf.conf。但是当我要通过相同的命令启动第
我有这种格式的 php 日志 [Day Mon DD HH:MM:SS YYYY] [Log-Type] [client ] : [Day Mon DD HH:MM:SS YYYY] [Log-Ty
我的 logstash 中的一些请求使 http 输出插件失败,并且日志显示 [2020-10-16T18:44:54,574][ERROR][logstash.outputs.http ] [HTT
我正在探索Logstash来接收HTTP上的输入。我已经使用以下方法安装了http插件: 插件安装logstash-input-http 安装成功。然后我尝试使用以下命令运行logstash: log
我是一名优秀的程序员,十分优秀!