- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章nginx日志导入elasticsearch的方法示例由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
将nginx日志通过filebeat收集后传入logstash,经过logstash处理后写入elasticsearch。filebeat只负责收集工作,logstash完成日志的格式化,数据的替换,拆分 ,以及将日志写入elasticsearch后的索引的创建.
1、配置nginx日志格式 。
1
2
3
4
5
6
7
|
log_format main
'$remote_addr $http_x_forwarded_for [$time_local] $server_name $request '
'$status $body_bytes_sent $http_referer '
'"$http_user_agent" '
'"$connection" '
'"$http_cookie" '
'$request_time '
'$upstream_response_time'
;
|
2、安装配置filebeat,启用nginx module 。
1
2
3
|
tar
-zxvf filebeat-6.2.4-linux-x86_64.
tar
.gz -C
/usr/local
cd
/usr/local
;
ln
-s filebeat-6.2.4-linux-x86_64 filebeat
cd
/usr/local/filebeat
|
启用nginx模块 。
1
|
.
/filebeat
modules
enable
nginx
|
查看模块 。
1
|
.
/filebeat
modules list
|
创建配置文件 。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
vim
/usr/local/filebeat/blog_module_logstash
.yml
filebeat.modules:
- module: nginx
access:
enabled:
true
var.paths: [
"/home/weblog/blog.cnfol.com_access.log"
]
#error:
# enabled: true
# var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"]
output.logstash:
hosts: [
"192.168.15.91:5044"
]
|
启动filebeat 。
1
|
.
/filebeat
-c blog_module_logstash.yml -e
|
3、配置logstash 。
1
2
3
4
|
tar
-zxvf logstash-6.2.4.
tar
.gz
/usr/local
cd
/usr/local
;
ln
-s logstash-6.2.4 logstash
创建一个nginx日志的pipline文件
cd
/usr/local/logstash
|
logstash内置的模板目录 。
1
|
vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns
|
编辑 grok-patterns 添加一个支持多ip的正则 。
1
|
FORWORD (?:%{IPV4}[,]?[ ]?)+|%{WORD}
|
官方grok 。
http://grokdebug.herokuapp.com/patterns# 。
创建logstash pipline配置文件 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
#input {
# stdin {}
#}
# 从filebeat接受数据
input {
beats {
port => 5044
host =>
"0.0.0.0"
}
}
filter {
# 添加一个调试的开关
mutate{add_field => {
"[@metadata][debug]"
=>
true
}}
grok {
# 过滤nginx日志
#match => { "message" => "%{NGINXACCESS_TEST2}" }
#match => { "message" => '%{IPORHOST:clientip} # (?<http_x_forwarded_for>[^\#]*) # \[%{HTTPDATE:[@metadata][webtime]}\] # %{NOTSPACE:hostname} # %{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} # %{NUMBER:response} # (?:%{NUMBER:bytes}|-) # (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) # (?:"(?<http_user_agent>[^#]*)") # (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) # (?:"(?<cookies>[^#]*)") # %{NUMBER:request_time:float} # (?:%{NUMBER:upstream_response_time:float}|-)' }
#match => { "message" => '(?:%{IPORHOST:clientip}|-) (?:%{TWO_IP:http_x_forwarded_for}|%{IPV4:http_x_forwarded_for}|-) \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) (?:"(?<cookies>[^#]*)") %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' }
match => {
"message"
=>
'(?:%{IPORHOST:clientip}|-) %{FORWORD:http_x_forwarded_for} \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) %{QS:cookie} %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)'
}
}
# 将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp
ruby {
#code => "event.set('@read_timestamp',event.get('@timestamp'))"
#将时区改为东8区
code =>
"event.set('@read_timestamp',event.get('@timestamp').time.localtime + 8*60*60)"
}
# 将nginx的日志记录时间格式化
# 格式化时间 20/May/2015:21:05:56 +0000
date
{
locale =>
"en"
match => [
"[@metadata][webtime]"
,
"dd/MMM/yyyy:HH:mm:ss Z"
]
}
# 将bytes字段由字符串转换为数字
mutate {
convert => {
"bytes"
=>
"integer"
}
}
# 将cookie字段解析成一个json
#mutate {
# gsub => ["cookies",'\;',',']
#}
# 如果有使用到cdn加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip
if
[http_x_forwarded_for] =~
", "
{
ruby {
code =>
'event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])'
}
}
# 解析ip,获得ip的地理位置
geoip {
source
=>
"http_x_forwarded_for"
# # 只获取ip的经纬度、国家、城市、时区
fields => [
"location"
,
"country_name"
,
"city_name"
,
"region_name"
]
}
# 将agent字段解析,获得浏览器、系统版本等具体信息
useragent {
source
=>
"agent"
target =>
"useragent"
}
#指定要删除的数据
#mutate{remove_field=>["message"]}
# 根据日志名设置索引名的前缀
ruby {
code =>
'event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])'
}
# 将@timestamp 格式化为2019.04.23
ruby {
code =>
'event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%Y.%m.%d"))'
}
# 设置输出的默认索引名
mutate {
add_field => {
#"[@metadata][index]" => "%{@[metadata][index_pre]}_%{+YYYY.MM.dd}"
"[@metadata][index]"
=>
"%{@[metadata][index_pre]}_%{@[metadata][index_day]}"
}
}
# 将cookies字段解析成json
# mutate {
# gsub => [
# "cookies", ";", ",",
# "cookies", "=", ":"
# ]
# #split => {"cookies" => ","}
# }
# json_encode {
# source => "cookies"
# target => "cookies_json"
# }
# mutate {
# gsub => [
# "cookies_json", ',', '","',
# "cookies_json", ':', '":"'
# ]
# }
# json {
# source => "cookies_json"
# target => "cookies2"
# }
# 如果grok解析存在错误,将错误独立写入一个索引
if
"_grokparsefailure"
in
[tags] {
#if "_dateparsefailure" in [tags] {
mutate {
replace => {
#"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{+YYYY.MM.dd}"
"[@metadata][index]"
=>
"%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}"
}
}
# 如果不存在错误就删除message
}
else
{
mutate{remove_field=>[
"message"
]}
}
}
output {
if
[@metadata][debug]{
# 输出到rubydebuyg并输出metadata
stdout{codec => rubydebug{metadata =>
true
}}
}
else
{
# 将输出内容转换成 "."
stdout{codec => dots}
# 将输出到指定的es
elasticsearch {
hosts => [
"192.168.15.160:9200"
]
index =>
"%{[@metadata][index]}"
document_type =>
"doc"
}
}
}
|
启动logstash 。
1
|
nohup
bin
/logstash
-f test_pipline2.conf &
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我.
原文链接:http://www.zhengdazhi.com/archives/1744 。
最后此篇关于nginx日志导入elasticsearch的方法示例的文章就讲到这里了,如果你想了解更多关于nginx日志导入elasticsearch的方法示例的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我在这里有一个问题,我不知道这是否正常。 但是我认为这里有些湖,安装插件elasticsearch-head之后,我在浏览器中启动url“http://localhost:9200/_plugin/h
我写了这个 flex 搜索查询: es.search(index=['ind1'],doc_type=['doc']) 我得到以下结果: {'_shards': {'failed': 0, 'skip
在ElasticSearch.Net v.5中,存在一个属性 Elasticsearch.Net.RequestData.Path ,该属性在ElasticSearch.Net v.6中已成为depr
如何让 elasticsearch 应用新配置?我更改了文件 ~ES_HOME/config/elasticsearch.yml 中的一个字符串: # Disable HTTP completely:
我正在尝试使用以下分析器在 elastic serach 7.1 中实现部分子字符串搜索 PUT my_index-001 { "settings": { "analysis": {
假设一个 elasticsearch 服务器在很短的时间内接收到 100 个任务。有些任务很短,有些任务很耗时,有些任务是删除任务,有些是插入和搜索查询。 elasticsearch 是如何决定先运行
我需要根据日期过滤一组值(在此处添加字段),然后按 device_id 对其进行分组。所以我正在使用以下东西: { "aggs":{ "dates_between":{ "fi
我在 Elasticsearch 中有一个企业索引。索引中的每个文档代表一个业务,每个业务都有business_hours。我试图允许使用星期几和时间过滤营业时间。例如,我们希望能够进行过滤,以显示我
我有一个这样的过滤查询 query: { filtered: { query: { bool: { should: [{multi_match: {
Elasticsearch 相当新,所以可能不得不忍受我,我遇到了一个问题,如果我使用 20 个字符或更少的字符搜索文档,文档会出现,但是查询中同一个单词中的任何更多字符,我没有结果: 使用“苯氧甲基
我试图更好地理解 ElasticSearch 的内部结构,所以我想知道 ElasticSearch 在内部计算以下两种情况的术语统计信息的方式是否存在任何差异。 第一种情况是当我有这样的文件时: {
在我的 elasticsearch 索引中,我索引了一堆工作。为简单起见,我们只说它们是一堆职位。当人们在我的搜索引擎中输入职位时,我想“自动完成”可能的匹配。 我在这里调查了完成建议:http://
我在很多映射中使用多字段。在 Elastic Search 的文档中,指示应将多字段替换为“fields”参数。参见 http://www.elasticsearch.org/guide/en/ela
我有如下查询, query = { "query": {"query_string": {"query": "%s" % q}}, "filter":{"ids
我有一个Json数据 "hits": [ { "_index": "outboxprov1", "_type": "deleted-c
这可能是一个初学者的问题,但我对大小有一些疑问。 根据 Elasticsearch 规范,大小的最大值可以是 10000,我想在下面验证我的理解: 示例查询: GET testindex-2016.0
我在 Elastic Search 中发现了滚动功能,这看起来非常有趣。看了那么多文档,下面的问题我还是不清楚。 如果偏移量已经存在那么为什么要使用滚动? 即将到来的记录呢?假设它完成了所有数据的滚动
我有以下基于注释的 Elasticsearch 配置,我已将索引设置为不被分析,因为我不希望这些字段被标记化: @Document(indexName = "abc", type = "efg
我正在尝试在单个索引中创建多个类型。例如,我试图在host索引中创建两种类型(post,ytb),以便在它们之间创建父子关系。 PUT /ytb { "mappings": { "po
我尝试创建一个简单的模板,包括一些动态模板,但我似乎无法为文档编制索引。 我得到错误: 400 {"error":"MapperParsingException[mapping [_default_]
我是一名优秀的程序员,十分优秀!