gpt4 book ai didi

regex - 如何使用 Spark 读取自定义多行日志

转载 作者:行者123 更新时间:2023-12-02 07:46:42 25 4
gpt4 key购买 nike

我正在尝试使用带有 Spark 的正则表达式模式来解析自定义日志文件:

我的日志文件:

2018-04-11 06:27:36 localhost debug: localhost received discover from 0.0.0.0
2018-04-11 06:27:36 localhost debug: sec = 0.4
2018-04-11 06:27:36 localhost debug: Msg-Type = text
2018-04-11 06:27:36 localhost debug: Content = XXXXXXXXXX
2018-04-11 06:27:34 localhost debug: localhost sending response to 0.0.0.0
2018-04-11 06:27:34 localhost debug: sec = 0.3
2018-04-11 06:27:34 localhost debug: Msg-Type = text
2018-04-11 06:27:34 localhost debug: Content = XXXXXXXXXX
...

这是我的代码片段:

case class Rlog(dateTime: String, server_name: String, log_type: String, server_addr:String, action: String, target_addr:String, cost:String, msg_type:String, content:String)
case class Slog(dateTime: String, server_name: String, log_type: String, server_addr:String, action: String, target_addr:String, msg_type:String, content:String)

val pattern_1 = """([\w|\s|\:|-]{19})\s([a-z]+)\s(\w+):\s(\w+)\sreceived\s(\w+)\sfrom\s([\.|\w]+)"""
val pattern_2 = """([\w|\s|\:|-]{19})\s([a-z]+)\s(\w+):\s{5}([\w|-]+)\s=\s([\.|\w]+)"""
val pattern_3 = """([\w|\s|\:|-]{19})\s([a-z]+)\s(\w+):\s(\w+)\ssending\s(\w+)\sto\s([\.|\w]+)"""

sc.textFile("/directory/logfile").map(?????)

有什么办法可以做到这一点吗?

最佳答案

您可以在map内使用pattern.unapplySeq(string)来获取与正则表达式匹配的所有组的List

例如,如果您有字符串:

val str = "2018-04-11 06:27:36 localhost debug: localhost received discover from 0.0.0.0"

然后你运行:

pattern_1.unapplySeq(str)

您将得到:

 Option[List[String]] = Some(List(2018-04-11 06:27:36, localhost, debug, localhost, discover, 0.0.0.0))

我已使用您的示例来解决此问题。这个答案假设某个日志类型和与其关联的消息类型、内容和秒数都将使用相同的时间戳进行打印。

// case class defintions here
// regex pattern_1, pattern_2, pattern_3 defined here

val rdd = sc.textFile("file").cache

// Filter in 3 rdds based on the pattern that gets matched
val receivedRdd = rdd.filter(_.matches(pattern_1.toString)).map(pattern_1.unapplySeq(_).get)
val sentRdd = rdd.filter(_.matches(pattern_3.toString)).map(pattern_3.unapplySeq(_).get)
val otherRdd = rdd.filter(_.matches(pattern_2.toString)).map(pattern_2.unapplySeq(_).get)

// Convert it to a dataframe
// Names are matching with case class Rlog and Slog
// To facilitate the conversion to Datasets

val receivedDF = receivedRdd.map{ case List(a,b,c,d,e,f) => (a,b,c,d,e,f)}
.toDF("dateTime" , "server_name", "log_type", "server_addr", "action", "target_addr")

val sentDF = sentRdd.map{ case List(a,b,c,d,e,f) => (a,b,c,d,e,f)}
.toDF("dateTime" , "server_name", "log_type", "server_addr", "action", "target_addr")

// Convert multiple lines containing msg-type, content etc to single line using pivot
val otherDF = otherRdd.map{ case List(ts , srvr, typ, i1 , i2) => (ts , srvr, typ, i1 , i2) }
.toDF("dateTime" , "server_name", "log_type", "i1" , "i2")
.groupBy("dateTime" , "server_name", "log_type")
.pivot("i1").agg(first($"i2") )
.select($"dateTime", $"server_name", $"log_type", $"sec".as("cost") , $"Msg-Type".as("msg_type"), $"Content".as("content"))

otherDF.show
//+-------------------+-----------+--------+----+--------+----------+
//| dateTime|server_name|log_type|cost|msg_type| content|
//+-------------------+-----------+--------+----+--------+----------+
//|2018-04-11 06:27:34| localhost| debug| 0.3| text|XXXXXXXXXX|
//|2018-04-11 06:27:36| localhost| debug| 0.4| text|XXXXXXXXXX|
//+-------------------+-----------+--------+----+--------+----------+

// Finally join based on dateTime, server_name and log_type and convert to Datasets

val RlogDS = receivedDF.join(otherDF, Seq("dateTime" , "server_name", "log_type")).as[Rlog]
val SlogDS = sentDF.join(otherDF, Seq("dateTime" , "server_name", "log_type")).as[Slog]

RlogDS.show(false)
//+-------------------+-----------+--------+-----------+--------+-----------+----+--------+----------+
//| dateTime|server_name|log_type|server_addr| action|target_addr|cost|msg_type| content|
//+-------------------+-----------+--------+-----------+--------+-----------+----+--------+----------+
//|2018-04-11 06:27:36| localhost| debug| localhost|discover| 0.0.0.0| 0.4| text|XXXXXXXXXX|
//+-------------------+-----------+--------+-----------+--------+-----------+----+--------+----------+

SlogDS.show(false)
//+-------------------+-----------+--------+-----------+--------+-----------+----+--------+----------+
//|dateTime |server_name|log_type|server_addr|action |target_addr|cost|msg_type|content |
//+-------------------+-----------+--------+-----------+--------+-----------+----+--------+----------+
//|2018-04-11 06:27:34|localhost |debug |localhost |response|0.0.0.0 |0.3 |text |XXXXXXXXXX|
//+-------------------+-----------+--------+-----------+--------+-----------+----+--------+----------

关于regex - 如何使用 Spark 读取自定义多行日志,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50406734/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com