gpt4 book ai didi

elasticsearch - 按字段关联 ELK 中的消息

转载 作者:行者123 更新时间:2023-11-29 02:55:42 27 4
gpt4 key购买 nike

相关:Combine logs and query in ELK

我们正在设置 ELK 并希望在 Kibana 4 中创建可视化。这里的问题是我们想要关联两种不同类型的消息。

简化:

  • 消息类型 1 字段:message_type、common_id_number、byte_count、...
  • 消息类型 2 字段:message_type、common_id_number、主机名……

两条消息在 elasticsearch 中共享相同的索引。

enter image description here

如您所见,我们试图在不考虑 common_id_number 的情况下绘制图表,但看来我们必须使用它。不过,我们还不知道怎么做。

有什么帮助吗?

编辑

这些是 ES 模板中的相关字段定义:

      "URIHost" : {
"type" : "string",
"norms" : {
"enabled" : false
},
"fields" : {
"raw" : {
"type" : "string",
"index" : "not_analyzed",
"ignore_above" : 256
}
}
},
"Type" : {
"type" : "string",
"norms" : {
"enabled" : false
},
"fields" : {
"raw" : {
"type" : "string",
"index" : "not_analyzed",
"ignore_above" : 256
}
}
},
"SessionID" : {
"type" : "long"
},
"Bytes" : {
"type" : "long"
},
"BytesReceived" : {
"type" : "long"
},
"BytesSent" : {
"type" : "long"
},

这是一个 TRAFFIC 类型的编辑文档:

{
"_index": "logstash-2015.11.05",
"_type": "paloalto",
"_id": "AVDZqdBjpQiRid-uxPjE",
"_score": null,
"_source": {
"@version": "1",
"@timestamp": "2015-11-05T21:59:55.543Z",
"syslog_severity_code": 5,
"syslog_facility_code": 1,
"syslog_timestamp": "Nov 5 22:59:58",
"Type": "TRAFFIC",
"SessionID": 21713,
"Bytes": 939,
"BytesSent": 480,
"BytesReceived": 459,
},
"fields": {
"@timestamp": [
1446760795543
]
},
"sort": [
1446760795543
]
}

这是一份威胁类型的文件:

{
"_index": "logstash-2015.11.05",
"_type": "paloalto",
"_id": "AVDZqVNIpQiRid-uxPjC",
"_score": null,
"_source": {
"@version": "1",
"@timestamp": "2015-11-05T21:59:23.440Z",
"syslog_severity_code": 5,
"syslog_facility_code": 1,
"syslog_timestamp": "Nov 5 22:59:26",
"Type": "THREAT",
"SessionID": 21713,
"URIHost": "whatever.nevermind.com",
"URIPath": "/connectiontest.html"
},
"fields": {
"@timestamp": [
1446760763440
]
},
"sort": [
1446760763440
]
}

这是 logstash“过滤器”配置:

filter {
if [type] == "paloalto" {
syslog_pri {
remove_field => [ "syslog_facility", "syslog_severity" ]
}

grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{HOSTNAME:hostname} %{INT},%{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME},%{INT},%{WORD:Type},%{GREEDYDATA:log}"
}
remove_field => [ "message" ]
}

if [Type] == "THREAT" {
csv {
source => "log"
columns => [ "Threat_OR_ContentType", "ConfigVersion", "GenerateTime", "SourceAddress", "DestinationAddress", "NATSourceIP", "NATDestinationIP", "Rule", "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "SessionID", "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags", "IPProtocol", "Action", "URL", "Threat_OR_ContentName", "reportid", "Category", "Severity", "Direction", "seqno", "actionflags", "SourceCountry", "DestinationCountry", "cpadding", "contenttype", "pcap_id", "filedigest", "cloud", "url_idx", "user_agent", "filetype", "xff", "referer", "sender", "subject", "recipient" ]
remove_field => [ "log" ]
}
mutate {
convert => {
"SessionID" => "integer"
"SourcePort" => "integer"
"DestinationPort" => "integer"
"NATSourcePort" => "integer"
"NATDestinationPort" => "integer"
}
remove_field => [ "ConfigVersion", "GenerateTime", "VirtualSystem", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "RepeatCount", "Flags", "Action", "reportid", "Severity", "seqno", "actionflags", "cpadding", "pcap_id", "filedigest", "recipient" ]
}
grok {
match => {
"URL" => "%{URIHOST:URIHost}%{URIPATH:URIPath}(%{URIPARAM:URIParam})?"
}
remove_field => [ "URL" ]
}
}

else if [Type] == "TRAFFIC" {
csv {
source => "log"
columns => [ "Threat_OR_ContentType", "ConfigVersion", "GenerateTime", "SourceAddress", "DestinationAddress", "NATSourceIP", "NATDestinationIP", "Rule", "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "SessionID", "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags", "IPProtocol", "Action", "Bytes", "BytesSent", "BytesReceived", "Packets", "StartTime", "ElapsedTimeInSecs", "Category", "Padding", "seqno", "actionflags", "SourceCountry", "DestinationCountry", "cpadding", "pkts_sent", "pkts_received", "session_end_reason" ]
remove_field => [ "log" ]
}
mutate {
convert => {
"SessionID" => "integer"
"SourcePort" => "integer"
"DestinationPort" => "integer"
"NATSourcePort" => "integer"
"NATDestinationPort" => "integer"
"Bytes" => "integer"
"BytesSent" => "integer"
"BytesReceived" => "integer"
"ElapsedTimeInSecs" => "integer"
}
remove_field => [ "ConfigVersion", "GenerateTime", "VirtualSystem", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "RepeatCount", "Flags", "Action", "Packets", "StartTime", "seqno", "actionflags", "cpadding", "pcap_id", "filedigest", "recipient" ]
}
}

date {
match => [ "syslog_timastamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
timezone => "CET"
remove_field => [ "syslog_timestamp" ]
}
}
}

我们正在尝试做的是将 URIHost 术语可视化为 X 轴,将字节、BytesSent 和 BytesReceived 总和可视化为 Y 轴。

最佳答案

我想你可以使用 aggregate filter执行你的任务。 aggregate 过滤器支持基于公共(public)字段值将多个日志行聚合到一个事件中。在您的情况下,我们将要使用的公共(public)字段将是 SessionID 字段。

然后我们需要另一个字段来检测第一个事件与应该聚合的第二个/最后一个事件。在您的情况下,这将是 Type 字段。

您需要像这样更改当前配置:

filter {

... all other filters

if [Type] == "THREAT" {
... all other filters

aggregate {
task_id => "%{SessionID}"
code => "map['URIHost'] = event['URIHost']; map['URIPath'] = event['URIPath']"
}
}

else if [Type] == "TRAFFIC" {
... all other filters

aggregate {
task_id => "%{SessionID}"
code => "event['URIHost'] = map['URIHost']; event['URIPath'] = map['URIPath']"
end_of_task => true
timeout => 120
}
}
}

大体思路是,当Logstash遇到THREAT日志时,会暂时将URIHostURIPath存储在内存事件映射中,然后当 TRAFFIC 日志进入时,URIHostURIPath 字段将添加到事件中。如果需要,您也可以复制其他字段。您还可以根据您预计 TRAFFIC 事件在最后一个 THREAT 事件发生后的多长时间来调整超时(以秒为单位)。

最后,您将获得包含来自 THREATTRAFFIC 日志行的合并数据的文档,您可以轻松创建可视化显示每个 的字节数URIHost 如您的屏幕截图所示。

关于elasticsearch - 按字段关联 ELK 中的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33549171/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com