gpt4 book ai didi

elasticsearch - 将日志条目与 logstash 相结合

转载 作者:行者123 更新时间:2023-12-03 01:01:46 29 4
gpt4 key购买 nike

我想从 dnsmasq 收集和处理日志,我决定使用 ELK。 Dnsmasq 用作 DHCP 服务器和 DNS 解析器,因此它为这两种服务创建日志条目。

我的目标是向 Elasticsearch 发送带有请求者 IP、请求者主机名(如果可用)和请求者 mac 地址的所有 DNS 查询。这将允许我对每个 MAC 地址的请求进行分组,无论设备 IP 是否更改,并显示主机名。

我想做的是以下几点:

1)阅读以下条目:

Mar 30 21:55:34 dnsmasq-dhcp[346]: 3806132383 DHCPACK(eth0)  192.168.0.80 04:0c:ce:d1:af:18 air

2)暂存关系:

192.168.0.80 => 04:0c:ce:d1:af:18

192.168.0.80 => 空气

3) 丰富如下条目,添加mac地址和主机名。如果主机名是空的,我会添加 mac 地址。
Mar 30 22:13:05 dnsmasq[346]: query[A] imap.gmail.com from 192.168.0.80

我找到了一个名为 “memorize” 的模块这将允许我存储它们,但不幸的是不适用于最新版本的 Logstash

我正在使用的版本:
ElastiSearch 2.3.0
Kibana 4.4.2
Logstash 2.2.2

还有logstash过滤器(这是我第一次尝试logstash,因此我确信配置文件可以改进)
input {
file {
path => "/var/log/dnsmasq.log"
start_position => "beginning"
type => "dnsmasq"
}
}

filter {
if [type] == "dnsmasq" {
grok {
match => [ "message", "%{SYSLOGTIMESTAMP:reqtimestamp} %{USER:program}\[%{NONNEGINT:pid}\]\: ?(%{NONNEGINT:num} )?%{NOTSPACE:action} %{IP:clientip} %{MAC:clientmac} ?(%{HOSTNAME:clientname})?"]
match => [ "message", "%{SYSLOGTIMESTAMP:reqtimestamp} %{USER:program}\[%{NONNEGINT:pid}\]\: ?(%{NONNEGINT:num} )?%{USER:action}?(\[%{USER:subaction}\])? %{NOTSPACE:domain} %{NOTSPACE:function} %{IP:clientip}"]
match => [ "message", "%{SYSLOGTIMESTAMP:reqtimestamp} %{USER:program}\[%{NONNEGINT:pid}\]\: %{NOTSPACE:action} %{DATA:data}"]
}

if [action] =~ "DHCPACK" {

}else if [action] == "query" {

}else
{
drop{}
}
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}

问题:

1) 有没有插件 “memorize” 的替代品?使用最新的 logstash 版本?另一个插件或不同的程序。

2)我应该将logstash降级到2之前的版本(我认为之前是1.5.4)?如果是这样,是否有任何已知的服务器问题或与 elasticsearch 2.2.1 不兼容?

3)或者我应该修改允许logstash 2.x的插件“内存”(如果是这样,我会感谢任何关于如何开始的指针)?

最佳答案

无需重新包装memorize在我看来这个插件。您可以使用 aggregate filter达到你想要的。

...

# record host/mac in temporary map
if [action] =~ "DHCPACK" {
aggregate {
task_id => "%{clientip}"
code => "map['clientmac'] = event['clientmac']; map['clientname'] = event['clientname'];"
map_action => "create_or_update"
# timeout set to 48h
timeout => 172800
}
}

# add host/mac where/when needed
else if [action] == "query" {
aggregate {
task_id => "%{clientip}"
code => "event['clientmac'] = map['clientmac']; event['clientname'] = map['clientname']"
map_action => "update"
}
}

关于elasticsearch - 将日志条目与 logstash 相结合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36360674/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com