- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
摘要: 本篇文章将从一个实际项目出发,分享如何使用 Spark 进行大规模日志分析,并通过代码演示加深读者的理解。
本文分享自华为云社区《 【实战经验分享】基于Spark的大规模日志分析【上进小菜猪大数据系列】 》,作者:上进小菜猪.
随着互联网的普及和应用范围的扩大,越来越多的应用场景需要对海量数据进行高效地处理和分析,这就要求我们必须具备大数据技术方面的知识和技能。本篇文章将从一个实际项目出发,分享如何使用 Spark 进行大规模日志分析,并通过代码演示加深读者的理解.
我们的项目是针对某购物网站的访问日志进行分析,其中主要包含以下几个字段:
原始数据规模约为 100GB,我们需要对其进行清洗、统计和分析,以得到有用的信息和价值.
由于原始数据存在缺失值、异常值、重复值等问题,因此我们需要进行数据清洗,主要包括以下步骤:
具体代码实现如下:
import org.apache.spark.{SparkConf, SparkContext} import java.text.SimpleDateFormat import java.util.Locale object DataCleaning { def main(args: Array[String]) { val conf = new SparkConf().setAppName( " DataCleaning " ) val sc = new SparkContext(conf) val data = sc.textFile( " hdfs://master:9000/log/access.log " ) // 定义时间格式及地区信息 val dateFormat = new SimpleDateFormat( " dd/MMM/yyyy:HH:mm:ss Z " , Locale.ENGLISH) // 数据清洗 val cleanData = data.map(line => { val arr = line.split( " " ) if (arr.length >= 9 ) { // 解析 IP val ip = arr( 0 ) // 解析时间,转换为 Unix 时间戳 val time = dateFormat.parse(arr( 3 ) + " " + arr( 4 )).getTime / 1000 // 解析 URL val url = urlDecode(arr( 6 )) // 解析 UserAgent val ua = arr( 8 ) (ip, time, url, ua) } }).filter(x => x != null ).distinct() // 结果输出 cleanData.saveAsTextFile( " hdfs://master:9000/cleanData " ) sc.stop() } // URL 解码 def urlDecode(url: String): String = { java.net.URLDecoder.decode(url, " utf-8 " ) } }
对于大规模数据的处理,我们可以使用 Spark 提供的强大的分布式计算能力,以提高处理效率和减少计算时间.
我们这里使用 Spark SQL 统计每个 URL 的访问量,并输出前 10 个访问量最高的 URL,代码如下:
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext case class LogRecord(ip: String, time: Long, url: String, ua: String) object DataAnalysis { def main(args: Array[String]) { val conf = new SparkConf().setAppName( " DataAnalysis " ) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // 读取清洗后的数据 val cleanData = sc.textFile( " hdfs://master:9000/cleanData " ).filter(x => x != null ) // 将数据转换为 DataFrame import sqlContext.implicits._ val logDF = cleanData.map(_.split( " , " )).map(p => LogRecord(p( 0 ), p( 1 ).toLong, p( 2 ), p( 3 ))).toDF() // 统计每个 URL 的访问量,并按访问量降序排序 val topUrls = logDF.groupBy( " url " ).count().sort($ " count " .desc) // 输出前 10 个访问量最高的 URL topUrls.take( 10 ). foreach (println) sc.stop() } }
数据可视化是将处理和分析后的数据以图表或图像的方式展示出来,有利于我们直观地观察数据的规律和趋势.
我们这里采用 Python 的 Matplotlib 库将前 10 个访问量最高的 URL 可视化,代码如下:
import matplotlib.pyplot as plt # 读取数据 with open( ' topUrls.txt ' , ' r ' ) as f: line = f.readline() urls = [] counts = [] while line and len(urls) < 10 : url, count = line.strip().split( ' , ' ) urls.append(url) counts.append( int (count)) line = f.readline() # 绘制直方图 plt.bar(range( 10 ), counts, align= ' center ' ) plt.xticks(range( 10 ), urls, rotation= 90 ) plt.xlabel( ' Url ' ) plt.ylabel( ' Count ' ) plt.title( ' Top 10 Url ' ) plt.show()
在进行数据清洗前,需要先对原始日志数据进行筛选,选取需要分析的字段。然后进行数据清洗,去掉不必要的空格、特殊字符等,使数据更加规整,并增加可读性.
下面是数据清洗的代码示例:
val originalRdd = spark.sparkContext.textFile( " path/to/logfile " ) val filteredRdd = originalRdd.filter(line => { val tokens = line.split( " \t " ) tokens.length >= 10 && tokens( 0 ).matches( " \d{4}-\d{2}-\d{2} " ) && tokens( 1 ).matches( " \d{2}:\d{2}:\d{2} " ) && tokens( 2 ).matches( " \d+ " ) && tokens( 3 ).matches( " \d+ " ) && tokens( 4 ).matches( " \d+ " ) && tokens( 5 ).matches( " \d+ " ) && tokens( 6 ).matches( " .+ " ) && tokens( 7 ).matches( " .+ " ) && tokens( 8 ).matches( " .+ " ) && tokens( 9 ).matches( " .+ " ) }) val cleanedRdd = filteredRdd.map(line => { val tokens = line.split( " \t " ) val timestamp = s " ${tokens(0)} ${tokens(1)} " val request = tokens( 6 ).replaceAll( """ , "" ) val responseCode = tokens( 8 ).toInt (timestamp, request, responseCode) })
在上述代码中,我们首先读取原始日志数据,并使用filter函数过滤掉不符合条件的行;然后使用map函数将数据转换为元组的形式,并进行清洗。其中,元组的三个元素分别是时间戳、请求内容和响应状态码.
接下来,让我们来介绍一下如何使用Spark进行数据统计.
数据统计是大规模数据分析中非常重要的一个环节。Spark提供了丰富的聚合函数,可用于对数据进行各种统计分析.
下面是对清洗后的数据进行统计分析的代码示例:
import org.apache.spark.sql.functions._ val df = spark.createDataFrame(cleanedRdd).toDF( " timestamp " , " request " , " responseCode " ) val totalCount = df.count() val errorsCount = df.filter(col( " responseCode " ) >= 400 ).count() val successCount = totalCount - errorsCount val topEndpoints = df.groupBy( " request " ).count().orderBy(desc( " count " )).limit( 10 ) topEndpoints.show()
在上面的代码中,我们首先将清洗后的数据转换为DataFrame,然后使用count函数计算总记录数和错误记录数,并计算成功记录数。最后使用groupBy和orderBy函数按照请求内容,对数据进行分组统计,并打印出请求次数最多的前10个端点.
通过可视化,我们可以清楚地看到前 10 个访问量最高的 URL 地址及其访问量,这对于进一步分析和优化网站的性能和用户体验具有重要的意义.
总结起来,这就是我们的一个大数据实战项目,我们使用 Spark 统计了购物网站的访问量,并通过 Python 的 Matplotlib 库将结果可视化。这个过程中,我们运用了数据清洗、Spark SQL 统计和可视化等技术,为大规模数据的处理和分析提供了有效的解决方案.
。
点击关注,第一时间了解华为云新鲜技术~ 。
最后此篇关于基于Spark的大规模日志分析的文章就讲到这里了,如果你想了解更多关于基于Spark的大规模日志分析的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
这是真的: log(A) + log(B) = log(A * B) [0] 这也是真的吗? O(log(A)) + O(log(B)) = O(log(A * B)) [1] 据我了解 O(f
0 引言 我常以为 配置 INFO 日志级别时, 应用程序代码中日志器(logger) debug 级的日志代码,不会被执行(比如,实验1中的printTestLog函数)。但今天线上的问题,
日志 日志是构建工具的主要界面。如果日志太多,真正的警告和问题容易被隐藏。另一方面,如果出了错,你需要找出相关的信息。Gradle 定义了6个日志级别,如表 18.1,“日志级别”所示。除了那些您通
日志 关键进程日志如下…(将 替换为启动服务的用户,将 替换为计算机名称) NameNode: $ HADOOP_HOME / logs / hadoop- -namenode- .log Da
我正在探索项目的 git 历史 FFMpeg .我在提交之间对每个文件执行了更改 517573a67088b5c7a25c18373434e3448892ee93和 80bb65fafab1d2f5f
我不知道如何在 loggly 中使用正则表达式进行搜索。例如,使用表达式 /24nonstop.+7554/ 记录我想查找的内容. { "level_name": "WARNING", "ex
有没有办法为 API 调用打开日志记录? 我们有一个第三方应用程序在使用我们的商店时遇到问题,希望获得一些调试信息。 ~我已经搜索了 bt 一无所获。 我正在使用 1.7 最佳答案 在一段受控的时间内
我正在尝试获取 SVN 中所有副本/移动/等的固定路径的日志历史记录(如果可能的话,递归地)。实际上,我试图避免 peg revisions ,并将日志应用于路径而不是对象。 svn 手册提出了这个问
如何在命令行中运行 NAnt 脚本并在日志文件中获取每个任务的时间? using nant task or NAnt -buildfile:testscript.build testnanttarg
是否有任何默认方式来记录哪些用户代理访问了您的服务器?我需要编制一份访问我们网站的浏览器列表,以便我们知道我们最能支持什么。 谢谢! 最佳答案 日志CGI.HTTP_USER_AGENT ,也许在 A
我在我的应用程序中使用 Spring 发送电子邮件。 我想在发送电子邮件时记录 imap 服务器操作。 我尝试按如下方式在我的 applicationContext.xml 中实现日志:
我已经运行一个 pod 一个多星期了,从开始到现在没有重启过。但是,我仍然无法查看自它启动以来的日志,它只提供最近两天的日志。容器是否有任何日志轮换策略以及如何根据大小或日期控制轮换? 我尝试了以下命
背景: 我正在设置我的第一个 flex 堆栈,尽管我将开始简单,但是我想确保我从良好的体系结构开始。我最终希望有以下解决方案:托管指标,服务器日志(expressjs APM),单页应用程序监视(AP
常规的 hg log 命令给出每个变更集至少 4 行的输出。例如 changeset: 238:03a214f2a1cf user: My Name date: Th
我在我的项目中使用 Spring iBatis 框架。然后使用 logback 进行记录。然后,在检查日志文件时,我可以看到系统正在使用的数据库...出于安全目的我想隐藏它 这是示例日志.. 12:2
我想使用 hg log 生成一个简短的变更日志,涵盖最新版本的变更。发行版标有“v”前缀,例如“v0.9.1”或“v1.0”。是否可以使用 revsets 选择以“v”开头的最后两个标签之间的范围,不
我是 PHP 的新手,所以如果有一个简单的答案,请原谅我。我在 stackoverflow 中搜索过任何类似的问题,但找不到任何帮助。 我正在开发一个现有的基于 php 的应用程序,我只需要能够将对象
我有一个名为 Radius 的程序可以验证用户登录。运行在CentOS服务器上 日志在/var/log/radius.log 中 它们如下 Mon Jul 24 22:17:08 2017 : Aut
我最近从使用“日志”切换到“日志”。 到目前为止,还不错,但我缺少一项关键功能——在运行时更改最低级别的能力。 在“logging',我可以调用 myLogger.setLevel(logging.I
假设我们有速度关键的系统(例如统计/分析、套接字编程等),我们如何设计跟踪和日志。 更具体地说,日志和跟踪通常会降低性能(即使我们有关闭机制或冗长的扩展机制)。在这种情况下,是否有任何关于如何“放置”
我是一名优秀的程序员,十分优秀!