gpt4 book ai didi

python - 使用 Spark 检查日志

转载 作者:太空宇宙 更新时间:2023-11-04 05:17:19 25 4
gpt4 key购买 nike

我是 Spark 的新手,我正在尝试开发一个 python 脚本来读取带有一些日志的 csv 文件:

userId,timestamp,ip,event
13,2016-12-29 16:53:44,86.20.90.121,login
43,2016-12-29 16:53:44,106.9.38.79,login
66,2016-12-29 16:53:44,204.102.78.108,logoff
101,2016-12-29 16:53:44,14.139.102.226,login
91,2016-12-29 16:53:44,23.195.2.174,logoff

并检查用户是否有一些奇怪的行为,例如他是否连续两次“登录”而没有“注销”。我已将 csv 加载为 Spark 数据帧,我想比较单个用户的日志行,按时间戳排序并检查两个连续事件是否属于同一类型(登录 - 登录,注销 - 注销)。我正在寻找以“map-reduce”方式进行的操作,但目前我无法弄清楚如何使用比较连续行的 reduce 函数。我编写的代码有效,但性能很差。

sc = SparkContext("local","Data Check")
sqlContext = SQLContext(sc)

LOG_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/flume/events/*"
RESULTS_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/spark/script_results/prova/bad_users.csv"
N_USERS = 10*1000

dataFrame = sqlContext.read.format("com.databricks.spark.csv").load(LOG_FILE_PATH)
dataFrame = dataFrame.selectExpr("C0 as userID","C1 as timestamp","C2 as ip","C3 as event")

wrongUsers = []

for i in range(0,N_USERS):

userDataFrame = dataFrame.where(dataFrame['userId'] == i)
userDataFrame = userDataFrame.sort('timestamp')

prevEvent = ''

for row in userDataFrame.rdd.collect():

currEvent = row[3]
if(prevEvent == currEvent):
wrongUsers.append(row[0])

prevEvent = currEvent

badUsers = sqlContext.createDataFrame(wrongUsers)
badUsers.write.format("com.databricks.spark.csv").save(RESULTS_FILE_PATH)

最佳答案

首先(不相关但仍然),确保每个用户的条目数不是那么大,因为 collect in for row in userDataFrame.rdd.collect(): 是危险的。

其次,您无需离开此处的DataFrame区域即可使用经典Python,只需坚持使用Spark。

现在,你的问题。它基本上是“对于每一行,我想从上一行了解一些东西”:属于 Window 函数的概念,准确地说是 lag 函数。这里有两篇关于 Spark 中窗口函数的有趣文章:一篇来自 Databricks使用 Python 代码和来自 Xinh 的代码使用(我认为更容易理解)Scala 中的示例。

我在 Scala 中有一个解决方案,但我认为您可以将其翻译成 Python:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lag

import sqlContext.implicits._

val LOG_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/flume/events/*"
val RESULTS_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/spark/script_results/prova/bad_users.csv"

val data = sqlContext
.read
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true") // use the header from your csv
.load(LOG_FILE_PATH)

val wSpec = Window.partitionBy("userId").orderBy("timestamp")

val badUsers = data
.withColumn("previousEvent", lag($"event", 1).over(wSpec))
.filter($"previousEvent" === $"event")
.select("userId")
.distinct

badUsers.write.format("com.databricks.spark.csv").save(RESULTS_FILE_PATH)

基本上,您只需检索上一行的值并将其与当前行的值进行比较,如果匹配则为错误行为,您将保留 userId。对于每个 userId 行“ block ”中的第一行,先前的值将为 null:与当前值比较时, bool 表达式将为 false 所以这里没有问题。

关于python - 使用 Spark 检查日志,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41386387/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com