gpt4 book ai didi

nginx日志导入elasticsearch的方法示例

转载 作者:qq735679552 更新时间:2022-09-29 22:32:09 25 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章nginx日志导入elasticsearch的方法示例由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

将nginx日志通过filebeat收集后传入logstash,经过logstash处理后写入elasticsearch。filebeat只负责收集工作,logstash完成日志的格式化,数据的替换,拆分 ,以及将日志写入elasticsearch后的索引的创建.

1、配置nginx日志格式 。

?
1
2
3
4
5
6
7
log_format main    '$remote_addr $http_x_forwarded_for [$time_local] $server_name $request '
             '$status $body_bytes_sent $http_referer '
             '"$http_user_agent" '
             '"$connection" '
             '"$http_cookie" '
             '$request_time '
             '$upstream_response_time' ;

2、安装配置filebeat,启用nginx module 。

?
1
2
3
tar -zxvf filebeat-6.2.4-linux-x86_64. tar .gz -C /usr/local
cd /usr/local ; ln -s filebeat-6.2.4-linux-x86_64 filebeat
cd /usr/local/filebeat

启用nginx模块 。

?
1
. /filebeat modules enable nginx

查看模块 。

?
1
. /filebeat modules list

创建配置文件 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
vim /usr/local/filebeat/blog_module_logstash .yml
filebeat.modules:
- module: nginx
  access:
   enabled: true
   var.paths: [ "/home/weblog/blog.cnfol.com_access.log" ]
  #error:
  # enabled: true
  # var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"]
 
 
output.logstash:
  hosts: [ "192.168.15.91:5044" ]

启动filebeat 。

?
1
. /filebeat -c blog_module_logstash.yml -e

3、配置logstash 。

?
1
2
3
4
tar -zxvf logstash-6.2.4. tar .gz /usr/local
cd /usr/local ; ln -s logstash-6.2.4 logstash
创建一个nginx日志的pipline文件
cd /usr/local/logstash

logstash内置的模板目录 。

?
1
vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns

编辑 grok-patterns 添加一个支持多ip的正则 。

?
1
FORWORD (?:%{IPV4}[,]?[ ]?)+|%{WORD}

官方grok 。

http://grokdebug.herokuapp.com/patterns# 。

创建logstash pipline配置文件 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#input {
# stdin {}
#}
# 从filebeat接受数据
input {
  beats {
  port => 5044
  host => "0.0.0.0"
  }
}
 
filter {
  # 添加一个调试的开关
  mutate{add_field => { "[@metadata][debug]" => true }}
  grok {
  # 过滤nginx日志
  #match => { "message" => "%{NGINXACCESS_TEST2}" }
  #match => { "message" => '%{IPORHOST:clientip} # (?<http_x_forwarded_for>[^\#]*) # \[%{HTTPDATE:[@metadata][webtime]}\] # %{NOTSPACE:hostname} # %{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} # %{NUMBER:response} # (?:%{NUMBER:bytes}|-) # (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) # (?:"(?<http_user_agent>[^#]*)") # (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) # (?:"(?<cookies>[^#]*)") # %{NUMBER:request_time:float} # (?:%{NUMBER:upstream_response_time:float}|-)' }
  #match => { "message" => '(?:%{IPORHOST:clientip}|-) (?:%{TWO_IP:http_x_forwarded_for}|%{IPV4:http_x_forwarded_for}|-) \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) (?:"(?<cookies>[^#]*)") %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' }
     match => { "message" => '(?:%{IPORHOST:clientip}|-) %{FORWORD:http_x_forwarded_for} \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) %{QS:cookie} %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' }
  }
  # 将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp
  ruby {
  #code => "event.set('@read_timestamp',event.get('@timestamp'))"
  #将时区改为东8区
  code => "event.set('@read_timestamp',event.get('@timestamp').time.localtime + 8*60*60)"
  }
  # 将nginx的日志记录时间格式化
  # 格式化时间 20/May/2015:21:05:56 +0000
  date {
  locale => "en"
  match => [ "[@metadata][webtime]" , "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
  # 将bytes字段由字符串转换为数字
  mutate {
  convert => { "bytes" => "integer" }
  }
  # 将cookie字段解析成一个json
  #mutate {
  # gsub => ["cookies",'\;',',']
  #}
  # 如果有使用到cdn加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip
  if [http_x_forwarded_for] =~ ", " {
      ruby {
          code => 'event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])'
         }
     }
  # 解析ip,获得ip的地理位置
  geoip {
  source => "http_x_forwarded_for"
  # # 只获取ip的经纬度、国家、城市、时区
  fields => [ "location" , "country_name" , "city_name" , "region_name" ]
  }
  # 将agent字段解析,获得浏览器、系统版本等具体信息
  useragent {
  source => "agent"
  target => "useragent"
  }
  #指定要删除的数据
  #mutate{remove_field=>["message"]}
  # 根据日志名设置索引名的前缀
  ruby {
  code => 'event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])'
  }
  # 将@timestamp 格式化为2019.04.23
  ruby {
  code => 'event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%Y.%m.%d"))'
  }
  # 设置输出的默认索引名
  mutate {
  add_field => {
   #"[@metadata][index]" => "%{@[metadata][index_pre]}_%{+YYYY.MM.dd}"
   "[@metadata][index]" => "%{@[metadata][index_pre]}_%{@[metadata][index_day]}"
  }
  }
  # 将cookies字段解析成json
# mutate {
# gsub => [
#  "cookies", ";", ",",
#  "cookies", "=", ":"
# ]
# #split => {"cookies" => ","}
# }
# json_encode {
# source => "cookies"
# target => "cookies_json"
# }
# mutate {
# gsub => [
#  "cookies_json", ',', '","',
#  "cookies_json", ':', '":"'
# ]
# }
# json {
# source => "cookies_json"
# target => "cookies2"
# }
  # 如果grok解析存在错误,将错误独立写入一个索引
  if "_grokparsefailure" in [tags] {
  #if "_dateparsefailure" in [tags] {
  mutate {
   replace => {
   #"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{+YYYY.MM.dd}"
   "[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}"
   }
  }
  # 如果不存在错误就删除message
  } else {
  mutate{remove_field=>[ "message" ]}
  }
}
 
output {
  if [@metadata][debug]{
  # 输出到rubydebuyg并输出metadata
  stdout{codec => rubydebug{metadata => true }}
  } else {
  # 将输出内容转换成 "."
  stdout{codec => dots}
  # 将输出到指定的es
  elasticsearch {
   hosts => [ "192.168.15.160:9200" ]
   index => "%{[@metadata][index]}"
   document_type => "doc"
  }
  }
}

启动logstash 。

?
1
nohup bin /logstash -f test_pipline2.conf &

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我.

原文链接:http://www.zhengdazhi.com/archives/1744 。

最后此篇关于nginx日志导入elasticsearch的方法示例的文章就讲到这里了,如果你想了解更多关于nginx日志导入elasticsearch的方法示例的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com