- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
相关:Combine logs and query in ELK
我们正在设置 ELK 并希望在 Kibana 4 中创建可视化。这里的问题是我们想要关联两种不同类型的消息。
简化:
两条消息在 elasticsearch 中共享相同的索引。
如您所见,我们试图在不考虑 common_id_number 的情况下绘制图表,但看来我们必须使用它。不过,我们还不知道怎么做。
有什么帮助吗?
编辑
这些是 ES 模板中的相关字段定义:
"URIHost" : {
"type" : "string",
"norms" : {
"enabled" : false
},
"fields" : {
"raw" : {
"type" : "string",
"index" : "not_analyzed",
"ignore_above" : 256
}
}
},
"Type" : {
"type" : "string",
"norms" : {
"enabled" : false
},
"fields" : {
"raw" : {
"type" : "string",
"index" : "not_analyzed",
"ignore_above" : 256
}
}
},
"SessionID" : {
"type" : "long"
},
"Bytes" : {
"type" : "long"
},
"BytesReceived" : {
"type" : "long"
},
"BytesSent" : {
"type" : "long"
},
这是一个 TRAFFIC 类型的编辑文档:
{
"_index": "logstash-2015.11.05",
"_type": "paloalto",
"_id": "AVDZqdBjpQiRid-uxPjE",
"_score": null,
"_source": {
"@version": "1",
"@timestamp": "2015-11-05T21:59:55.543Z",
"syslog_severity_code": 5,
"syslog_facility_code": 1,
"syslog_timestamp": "Nov 5 22:59:58",
"Type": "TRAFFIC",
"SessionID": 21713,
"Bytes": 939,
"BytesSent": 480,
"BytesReceived": 459,
},
"fields": {
"@timestamp": [
1446760795543
]
},
"sort": [
1446760795543
]
}
这是一份威胁类型的文件:
{
"_index": "logstash-2015.11.05",
"_type": "paloalto",
"_id": "AVDZqVNIpQiRid-uxPjC",
"_score": null,
"_source": {
"@version": "1",
"@timestamp": "2015-11-05T21:59:23.440Z",
"syslog_severity_code": 5,
"syslog_facility_code": 1,
"syslog_timestamp": "Nov 5 22:59:26",
"Type": "THREAT",
"SessionID": 21713,
"URIHost": "whatever.nevermind.com",
"URIPath": "/connectiontest.html"
},
"fields": {
"@timestamp": [
1446760763440
]
},
"sort": [
1446760763440
]
}
这是 logstash“过滤器”配置:
filter {
if [type] == "paloalto" {
syslog_pri {
remove_field => [ "syslog_facility", "syslog_severity" ]
}
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{HOSTNAME:hostname} %{INT},%{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME},%{INT},%{WORD:Type},%{GREEDYDATA:log}"
}
remove_field => [ "message" ]
}
if [Type] == "THREAT" {
csv {
source => "log"
columns => [ "Threat_OR_ContentType", "ConfigVersion", "GenerateTime", "SourceAddress", "DestinationAddress", "NATSourceIP", "NATDestinationIP", "Rule", "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "SessionID", "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags", "IPProtocol", "Action", "URL", "Threat_OR_ContentName", "reportid", "Category", "Severity", "Direction", "seqno", "actionflags", "SourceCountry", "DestinationCountry", "cpadding", "contenttype", "pcap_id", "filedigest", "cloud", "url_idx", "user_agent", "filetype", "xff", "referer", "sender", "subject", "recipient" ]
remove_field => [ "log" ]
}
mutate {
convert => {
"SessionID" => "integer"
"SourcePort" => "integer"
"DestinationPort" => "integer"
"NATSourcePort" => "integer"
"NATDestinationPort" => "integer"
}
remove_field => [ "ConfigVersion", "GenerateTime", "VirtualSystem", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "RepeatCount", "Flags", "Action", "reportid", "Severity", "seqno", "actionflags", "cpadding", "pcap_id", "filedigest", "recipient" ]
}
grok {
match => {
"URL" => "%{URIHOST:URIHost}%{URIPATH:URIPath}(%{URIPARAM:URIParam})?"
}
remove_field => [ "URL" ]
}
}
else if [Type] == "TRAFFIC" {
csv {
source => "log"
columns => [ "Threat_OR_ContentType", "ConfigVersion", "GenerateTime", "SourceAddress", "DestinationAddress", "NATSourceIP", "NATDestinationIP", "Rule", "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "SessionID", "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags", "IPProtocol", "Action", "Bytes", "BytesSent", "BytesReceived", "Packets", "StartTime", "ElapsedTimeInSecs", "Category", "Padding", "seqno", "actionflags", "SourceCountry", "DestinationCountry", "cpadding", "pkts_sent", "pkts_received", "session_end_reason" ]
remove_field => [ "log" ]
}
mutate {
convert => {
"SessionID" => "integer"
"SourcePort" => "integer"
"DestinationPort" => "integer"
"NATSourcePort" => "integer"
"NATDestinationPort" => "integer"
"Bytes" => "integer"
"BytesSent" => "integer"
"BytesReceived" => "integer"
"ElapsedTimeInSecs" => "integer"
}
remove_field => [ "ConfigVersion", "GenerateTime", "VirtualSystem", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "RepeatCount", "Flags", "Action", "Packets", "StartTime", "seqno", "actionflags", "cpadding", "pcap_id", "filedigest", "recipient" ]
}
}
date {
match => [ "syslog_timastamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
timezone => "CET"
remove_field => [ "syslog_timestamp" ]
}
}
}
我们正在尝试做的是将 URIHost 术语可视化为 X 轴,将字节、BytesSent 和 BytesReceived 总和可视化为 Y 轴。
最佳答案
我想你可以使用 aggregate
filter执行你的任务。 aggregate
过滤器支持基于公共(public)字段值将多个日志行聚合到一个事件中。在您的情况下,我们将要使用的公共(public)字段将是 SessionID
字段。
然后我们需要另一个字段来检测第一个事件与应该聚合的第二个/最后一个事件。在您的情况下,这将是 Type
字段。
您需要像这样更改当前配置:
filter {
... all other filters
if [Type] == "THREAT" {
... all other filters
aggregate {
task_id => "%{SessionID}"
code => "map['URIHost'] = event['URIHost']; map['URIPath'] = event['URIPath']"
}
}
else if [Type] == "TRAFFIC" {
... all other filters
aggregate {
task_id => "%{SessionID}"
code => "event['URIHost'] = map['URIHost']; event['URIPath'] = map['URIPath']"
end_of_task => true
timeout => 120
}
}
}
大体思路是,当Logstash遇到THREAT
日志时,会暂时将URIHost
和URIPath
存储在内存事件映射中,然后当 TRAFFIC
日志进入时,URIHost
和 URIPath
字段将添加到事件中。如果需要,您也可以复制其他字段。您还可以根据您预计 TRAFFIC
事件在最后一个 THREAT
事件发生后的多长时间来调整超时(以秒为单位)。
最后,您将获得包含来自 THREAT
和 TRAFFIC
日志行的合并数据的文档,您可以轻松创建可视化显示每个 的字节数URIHost
如您的屏幕截图所示。
关于elasticsearch - 按字段关联 ELK 中的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33549171/
最近几天,我们考虑使用 Solr 作为我们的首选搜索引擎。 我们需要的大多数功能都是开箱即用的,或者可以轻松配置。 然而,我们绝对需要的一项功能似乎在 Solr 中被很好地隐藏(或缺失)了。 我会试着
我是 Sequelize 的新手,并且一直在探索关联。我正在使用 mysql 5.6 并 Sequelize ^4.42.0。我正在尝试创建两个简单的表:PRJS 和 TASKS 并将一些数据插入这些
关联、聚合和组合之间有什么区别?请从实现的角度解释一下。 最佳答案 对于两个对象,Foo 和 Bar 可以定义关系 关联 - 我与一个对象有关系。 Foo 使用 Bar public class Fo
这两种 hasOne 语法有什么区别? class Project { ....... ............ static hasOne = Employee // static h
对于当前的项目,我想使用遗传算法 - 目前我查看了 jenetics 库。 如何强制某些基因相互依赖?我想将 CSS 映射到基因上,例如我有基因指示是否显示图像,以及如果它也是各自的高度和宽度。因此,
关联、聚合和组合之间有什么区别?请从实现的角度解释一下。 最佳答案 对于两个对象,Foo 和 Bar 可以定义关系 关联 - 我与一个对象有关系。 Foo 使用 Bar public class Fo
假设我有一个名为“学生”的表格,其中包含姓名、手机、电子邮件、首选类(class)、首选学校、性别、年龄、地址、资格、职称、家庭电话、工作电话等列 我想从 Students 表中选择数据并插入到 2
问题标题有点困惑。我有一级员工和一级项目。一名或多名员工正在从事一个或多个项目。在这个关联中,我只有一个从具有*多重性的员工类到具有*多重性的项目类的链接。现在有另一种实现。每个项目只有一名经理,属于
到目前为止,我有一个程序采用一组随机点、站点,并围绕这些点形成适当的 Voronoi 图,表示为角和边的图形。它还为我提供了 Delaunay 三角剖分作为另一个以所有站点为节点的图形(尽管我不知道这
实现IComMethodEvents时你得到三个事件。 OnMethodCall OnMethodException OnMethodReturn 我的目标是记录 COM+ 组件中每个方法的调用时间。
我正在处理这个问题。我正在创造数学问题,每一个都有回应。例如。 如果我的问题是关于“5x + 15 = 2 的结果?”,我将只等待一个答案(整数)。 如果我的问题是关于“给我这个形状的面积和许可”,我
我正在寻找一种数据结构来保存唯一元素的无序集合,它将支持以下操作 在集合中任意位置插入/删除元素 查询元素是否存在 访问一个随机元素 天真地,1 和 2 建议使用关联容器,例如unordered_se
是否可以在 LINQ 中使用类似 ContactAddress.Contact 的内容,而无需在 SQL Server 中在这两者之间创建外键关系(通过 Contact.Id ContactAddr
我一直在谷歌搜索,但不明白调用 javax.persistence.criteria.Subquery 和 Criteria API 的方法相关的结果是什么。 http://www.objectdb.
我正在关注 Chris McCord 的“Programming Phoenix”一书,在第 6 章中,在 User 之间创建了一个关系。和一个 Video . 尝试使用 mix phoenix.se
我在 XAML 中有一个 ItemsControl,我在其中为每个组显示一个扩展器,以便我可以展开/折叠该组。我想保持 IsExpanded 的状态属性(以及可能与组标题显示相关的其他设置)。通常你只
Oracle 11 中是否有内置方法来检查 varchar2 字段中值的相关性?例如,给定一个简单的表,如下所示: MEAL_NUM INGREDIENT --------------------
是否可以在没有 JPA 在数据库中创建外键的情况下设置多对一关联? 这些表归另一个系统所有,并以异步方式填充。因此我们不能在数据库中使用 FK。仍然,几乎总是,最终是一种关系。 @ManyToOne(
我一直在使用NHibernate,使用Fluent NHibernate进行映射。我解决了很多问题,并开始认为自己在nhibernate中经验丰富。 但是,此错误非常奇怪。 这是我的模型: p
我正在开发一个 Typescript Sequelize 项目,其中我的 /models/index.ts 文件具有以下“导入此目录中的所有模型”功能: var basename = path.bas
我是一名优秀的程序员,十分优秀!