gpt4 book ai didi

基于Spark的大规模日志分析

转载 作者:我是一只小鸟 更新时间:2023-06-15 14:31:50 27 4
gpt4 key购买 nike

摘要: 本篇文章将从一个实际项目出发,分享如何使用 Spark 进行大规模日志分析,并通过代码演示加深读者的理解。

本文分享自华为云社区《 【实战经验分享】基于Spark的大规模日志分析【上进小菜猪大数据系列】 》,作者:上进小菜猪.

随着互联网的普及和应用范围的扩大,越来越多的应用场景需要对海量数据进行高效地处理和分析,这就要求我们必须具备大数据技术方面的知识和技能。本篇文章将从一个实际项目出发,分享如何使用 Spark 进行大规模日志分析,并通过代码演示加深读者的理解.

1.数据来源

我们的项目是针对某购物网站的访问日志进行分析,其中主要包含以下几个字段:

  • IP:访问的客户端 IP 地址
  • Time:访问时间
  • Url:访问的 URL 地址
  • User-Agent:浏览器标识符

原始数据规模约为 100GB,我们需要对其进行清洗、统计和分析,以得到有用的信息和价值.

2. 数据清洗

由于原始数据存在缺失值、异常值、重复值等问题,因此我们需要进行数据清洗,主要包括以下步骤:

  1. 将原始数据进行格式转换,方便后续处理
  2. 对 IP、Time、Url 和 User-Agent 字段进行解析和提取
  3. 去除不合法的记录和重复的记录

具体代码实现如下:

                            
                              import org.apache.spark.{SparkConf, SparkContext}
import java.text.SimpleDateFormat
import java.util.Locale
​

                            
                            
                              object
                            
                            
                               DataCleaning {
  def main(args: Array[String]) {
    val conf 
                            
                            = 
                            
                              new
                            
                             SparkConf().setAppName(
                            
                              "
                            
                            
                              DataCleaning
                            
                            
                              "
                            
                            
                              )
    val sc 
                            
                            = 
                            
                              new
                            
                            
                               SparkContext(conf)
    val data 
                            
                            = sc.textFile(
                            
                              "
                            
                            
                              hdfs://master:9000/log/access.log
                            
                            
                              "
                            
                            
                              )
​
 
                            
                            
                              //
                            
                            
                               定义时间格式及地区信息
                            
                            
    val dateFormat = 
                            
                              new
                            
                             SimpleDateFormat(
                            
                              "
                            
                            
                              dd/MMM/yyyy:HH:mm:ss Z
                            
                            
                              "
                            
                            
                              , Locale.ENGLISH)
​
 
                            
                            
                              //
                            
                            
                               数据清洗
                            
                            
    val cleanData = data.map(line =>
                            
                               {
      val arr 
                            
                            = line.split(
                            
                              "
                            
                            
                              "
                            
                            
                              )
 
                            
                            
                              if
                            
                             (arr.length >= 
                            
                              9
                            
                            
                              ) {
 
                            
                            
                              //
                            
                            
                               解析 IP
                            
                            
        val ip = arr(
                            
                              0
                            
                            
                              )
​
 
                            
                            
                              //
                            
                            
                               解析时间,转换为 Unix 时间戳
                            
                            
        val time = dateFormat.parse(arr(
                            
                              3
                            
                            ) + 
                            
                              "
                            
                            
                              "
                            
                             + arr(
                            
                              4
                            
                            )).getTime / 
                            
                              1000
                            
                            
                            
                              //
                            
                            
                               解析 URL
                            
                            
        val url = urlDecode(arr(
                            
                              6
                            
                            
                              ))
​
 
                            
                            
                              //
                            
                            
                               解析 UserAgent
                            
                            
        val ua = arr(
                            
                              8
                            
                            
                              )
​
 (ip, time, url, ua)
 }
 }).filter(x 
                            
                            => x != 
                            
                              null
                            
                            
                              ).distinct()
​
 
                            
                            
                              //
                            
                            
                               结果输出
                            
                            
    cleanData.saveAsTextFile(
                            
                              "
                            
                            
                              hdfs://master:9000/cleanData
                            
                            
                              "
                            
                            
                              )
​
    sc.stop()
 }
​
 
                            
                            
                              //
                            
                            
                               URL 解码
                            
                            
  def urlDecode(url: String): String =
                            
                               {
    java.net.URLDecoder.decode(url, 
                            
                            
                              "
                            
                            
                              utf-8
                            
                            
                              "
                            
                            
                              )
 }
}
                            
                          

3. 数据统计

对于大规模数据的处理,我们可以使用 Spark 提供的强大的分布式计算能力,以提高处理效率和减少计算时间.

我们这里使用 Spark SQL 统计每个 URL 的访问量,并输出前 10 个访问量最高的 URL,代码如下:

                            
                              import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
​

                            
                            
                              case
                            
                            
                              class
                            
                            
                               LogRecord(ip: String, time: Long, url: String, ua: String)
​

                            
                            
                              object
                            
                            
                               DataAnalysis {
  def main(args: Array[String]) {
    val conf 
                            
                            = 
                            
                              new
                            
                             SparkConf().setAppName(
                            
                              "
                            
                            
                              DataAnalysis
                            
                            
                              "
                            
                            
                              )
    val sc 
                            
                            = 
                            
                              new
                            
                            
                               SparkContext(conf)
    val sqlContext 
                            
                            = 
                            
                              new
                            
                            
                               SQLContext(sc)
​
 
                            
                            
                              //
                            
                            
                               读取清洗后的数据
                            
                            
    val cleanData = sc.textFile(
                            
                              "
                            
                            
                              hdfs://master:9000/cleanData
                            
                            
                              "
                            
                            ).filter(x => x != 
                            
                              null
                            
                            
                              )
​
 
                            
                            
                              //
                            
                            
                               将数据转换为 DataFrame
                            
                            
                               import sqlContext.implicits._
    val logDF 
                            
                            = cleanData.map(_.split(
                            
                              "
                            
                            
                              ,
                            
                            
                              "
                            
                            )).map(p => LogRecord(p(
                            
                              0
                            
                            ), p(
                            
                              1
                            
                            ).toLong, p(
                            
                              2
                            
                            ), p(
                            
                              3
                            
                            
                              ))).toDF()
​
 
                            
                            
                              //
                            
                            
                               统计每个 URL 的访问量,并按访问量降序排序
                            
                            
    val topUrls = logDF.groupBy(
                            
                              "
                            
                            
                              url
                            
                            
                              "
                            
                            ).count().sort($
                            
                              "
                            
                            
                              count
                            
                            
                              "
                            
                            
                              .desc)
​
 
                            
                            
                              //
                            
                            
                               输出前 10 个访问量最高的 URL
                            
                            
    topUrls.take(
                            
                              10
                            
                            ).
                            
                              foreach
                            
                            
                              (println)
​
    sc.stop()
 }
}
                            
                          

4. 数据可视化

数据可视化是将处理和分析后的数据以图表或图像的方式展示出来,有利于我们直观地观察数据的规律和趋势.

我们这里采用 Python 的 Matplotlib 库将前 10 个访问量最高的 URL 可视化,代码如下:

                            import matplotlib.pyplot 
                            
                              as
                            
                            
                               plt
​
# 读取数据
with open(
                            
                            
                              '
                            
                            
                              topUrls.txt
                            
                            
                              '
                            
                            , 
                            
                              '
                            
                            
                              r
                            
                            
                              '
                            
                            ) 
                            
                              as
                            
                            
                               f:
    line 
                            
                            =
                            
                               f.readline()
    urls 
                            
                            =
                            
                               []
    counts 
                            
                            =
                            
                               []
 
                            
                            
                              while
                            
                             line and len(urls) < 
                            
                              10
                            
                            
                              :
        url, count 
                            
                            = line.strip().split(
                            
                              '
                            
                            
                              ,
                            
                            
                              '
                            
                            
                              )
        urls.append(url)
        counts.append(
                            
                            
                              int
                            
                            
                              (count))
        line 
                            
                            =
                            
                               f.readline()
# 绘制直方图
plt.bar(range(
                            
                            
                              10
                            
                            ), counts, align=
                            
                              '
                            
                            
                              center
                            
                            
                              '
                            
                            
                              )
plt.xticks(range(
                            
                            
                              10
                            
                            ), urls, rotation=
                            
                              90
                            
                            
                              )
plt.xlabel(
                            
                            
                              '
                            
                            
                              Url
                            
                            
                              '
                            
                            
                              )
plt.ylabel(
                            
                            
                              '
                            
                            
                              Count
                            
                            
                              '
                            
                            
                              )
plt.title(
                            
                            
                              '
                            
                            
                              Top 10 Url
                            
                            
                              '
                            
                            
                              )
plt.show()
                            
                          

在进行数据清洗前,需要先对原始日志数据进行筛选,选取需要分析的字段。然后进行数据清洗,去掉不必要的空格、特殊字符等,使数据更加规整,并增加可读性.

下面是数据清洗的代码示例:

                            val originalRdd = spark.sparkContext.textFile(
                            
                              "
                            
                            
                              path/to/logfile
                            
                            
                              "
                            
                            
                              )
​
val filteredRdd 
                            
                            = originalRdd.filter(line =>
                            
                               {
  val tokens 
                            
                            = line.split(
                            
                              "
                            
                            
                              \t
                            
                            
                              "
                            
                            
                              )
  tokens.length 
                            
                            >= 
                            
                              10
                            
                             &&
                            
                              
 tokens(
                            
                            
                              0
                            
                            ).matches(
                            
                              "
                            
                            
                              \d{4}-\d{2}-\d{2}
                            
                            
                              "
                            
                            ) &&
                            
                              
 tokens(
                            
                            
                              1
                            
                            ).matches(
                            
                              "
                            
                            
                              \d{2}:\d{2}:\d{2}
                            
                            
                              "
                            
                            ) &&
                            
                              
 tokens(
                            
                            
                              2
                            
                            ).matches(
                            
                              "
                            
                            
                              \d+
                            
                            
                              "
                            
                            ) &&
                            
                              
 tokens(
                            
                            
                              3
                            
                            ).matches(
                            
                              "
                            
                            
                              \d+
                            
                            
                              "
                            
                            ) &&
                            
                              
 tokens(
                            
                            
                              4
                            
                            ).matches(
                            
                              "
                            
                            
                              \d+
                            
                            
                              "
                            
                            ) &&
                            
                              
 tokens(
                            
                            
                              5
                            
                            ).matches(
                            
                              "
                            
                            
                              \d+
                            
                            
                              "
                            
                            ) &&
                            
                              
 tokens(
                            
                            
                              6
                            
                            ).matches(
                            
                              "
                            
                            
                              .+
                            
                            
                              "
                            
                            ) &&
                            
                              
 tokens(
                            
                            
                              7
                            
                            ).matches(
                            
                              "
                            
                            
                              .+
                            
                            
                              "
                            
                            ) &&
                            
                              
 tokens(
                            
                            
                              8
                            
                            ).matches(
                            
                              "
                            
                            
                              .+
                            
                            
                              "
                            
                            ) &&
                            
                              
 tokens(
                            
                            
                              9
                            
                            ).matches(
                            
                              "
                            
                            
                              .+
                            
                            
                              "
                            
                            
                              )
})
​
val cleanedRdd 
                            
                            = filteredRdd.map(line =>
                            
                               {
  val tokens 
                            
                            = line.split(
                            
                              "
                            
                            
                              \t
                            
                            
                              "
                            
                            
                              )
  val timestamp 
                            
                            = s
                            
                              "
                            
                            
                              ${tokens(0)} ${tokens(1)}
                            
                            
                              "
                            
                            
                              
  val request 
                            
                            = tokens(
                            
                              6
                            
                            ).replaceAll(
                            
                              """
                            
                            
                              , 
                            
                            
                              ""
                            
                            
                              )
                            
                            
  val responseCode = tokens(
                            
                              8
                            
                            
                              ).toInt
 (timestamp, request, responseCode)
})
                            
                          

​在上述代码中,我们首先读取原始日志数据,并使用filter函数过滤掉不符合条件的行;然后使用map函数将数据转换为元组的形式,并进行清洗。其中,元组的三个元素分别是时间戳、请求内容和响应状态码.

接下来,让我们来介绍一下如何使用Spark进行数据统计.

数据统计是大规模数据分析中非常重要的一个环节。Spark提供了丰富的聚合函数,可用于对数据进行各种统计分析.

下面是对清洗后的数据进行统计分析的代码示例:

                            
                              import org.apache.spark.sql.functions._
​
val df 
                            
                            = spark.createDataFrame(cleanedRdd).toDF(
                            
                              "
                            
                            
                              timestamp
                            
                            
                              "
                            
                            , 
                            
                              "
                            
                            
                              request
                            
                            
                              "
                            
                            , 
                            
                              "
                            
                            
                              responseCode
                            
                            
                              "
                            
                            
                              )
val totalCount 
                            
                            =
                            
                               df.count()
val errorsCount 
                            
                            = df.filter(col(
                            
                              "
                            
                            
                              responseCode
                            
                            
                              "
                            
                            ) >= 
                            
                              400
                            
                            
                              ).count()
val successCount 
                            
                            = totalCount -
                            
                               errorsCount
val topEndpoints 
                            
                            = df.groupBy(
                            
                              "
                            
                            
                              request
                            
                            
                              "
                            
                            ).count().orderBy(desc(
                            
                              "
                            
                            
                              count
                            
                            
                              "
                            
                            )).limit(
                            
                              10
                            
                            
                              )
topEndpoints.show()
                            
                          

在上面的代码中,我们首先将清洗后的数据转换为DataFrame,然后使用count函数计算总记录数和错误记录数,并计算成功记录数。最后使用groupBy和orderBy函数按照请求内容,对数据进行分组统计,并打印出请求次数最多的前10个端点.

通过可视化,我们可以清楚地看到前 10 个访问量最高的 URL 地址及其访问量,这对于进一步分析和优化网站的性能和用户体验具有重要的意义.

总结起来,这就是我们的一个大数据实战项目,我们使用 Spark 统计了购物网站的访问量,并通过 Python 的 Matplotlib 库将结果可视化。这个过程中,我们运用了数据清洗、Spark SQL 统计和可视化等技术,为大规模数据的处理和分析提供了有效的解决方案.

  。

点击关注,第一时间了解华为云新鲜技术~ 。

最后此篇关于基于Spark的大规模日志分析的文章就讲到这里了,如果你想了解更多关于基于Spark的大规模日志分析的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com