gpt4 book ai didi

scala - Spark程序性能——GC&任务反序列化&并发执行

转载 作者:行者123 更新时间:2023-12-02 05:48:39 30 4
gpt4 key购买 nike

我有一个4台机器的集群,1台master,3台worker,每台128G内存,64个核心。我在独立模式下使用 Spark 1.5.0。我的程序使用 JDBC 从 Oracle 表中读取数据,然后执行 ETL、操作数据,并执行 k-means 等机器学习任务。

我有一个 DataFrame (myDF.cache()),它是与其他两个 DataFrame 的连接结果,并被缓存。 DataFrame包含2700万行,数据大小约为1.5G。我需要过滤数据并计算24个直方图,如下:

val h1 = myDF.filter("pmod(idx, 24) = 0").select("col1").histogram(arrBucket) 
val h2 = myDF.filter("pmod(idx, 24) = 1").select("col1").histogram(arrBucket)
// ......
val h24 = myDF.filter("pmod(idx, 24) = 23").select("col1").histogram(arrBucket)

问题:

  1. 由于我的 DataFrame 已缓存,因此我预计筛选、选择和直方图会非常快。但每次计算的实际时间约为7秒,这是 Not Acceptable 。从 UI 来看,GC 时间需要 5 秒,任务反序列化时间需要 4 秒。我尝试了不同的 JVM 参数,但无法进一步改进。现在我正在使用

    -Xms25G -Xmx25G -XX:MaxPermSize=512m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 \
    -XX:ParallelGCThreads=32 \
    -XX:ConcGCThreads=8 -XX:InitiatingHeapOccupancyPercent=70

令我困惑的是,数据的大小与可用内存相比根本不算什么。为什么每次运行过滤/选择/直方图时都会启动 GC?有没有办法减少GC时间和任务反序列化时间?

  • 我必须对 h[1-24] 进行并行计算,而不是顺序计算。我尝试了 Future,例如:

    import scala.concurrent.{Await, Future, blocking} 

    import scala.concurrent.ExecutionContext.Implicits.global

    val f1 = Future{myDF.filter("pmod(idx, 24) = 1").count}
    val f2 = Future{myDF.filter("pmod(idx, 24) = 2").count}
    val f3 = Future{myDF.filter("pmod(idx, 24) = 3").count}

    val future = for {c1 <- f1; c2 <- f2; c3 <- f3} yield {
    c1 + c2 + c3
    }

    val summ = Await.result(future, 180 second)
  • 问题是,这里的 Future 仅意味着作业几乎同时提交给调度程序,而不是它们最终被调度并同时运行。这里使用 Future 根本不会提高性能。

    如何让24个计算作业同时运行?

    最佳答案

    您可以尝试以下几件事:

    1. 不要再次计算 pmod(idx, 24)。相反,您可以简单地计算一次:

      import org.apache.spark.sql.functions.{pmod, lit}

      val myDfWithBuckets = myDF.withColumn("bucket", pmod($"idx", lit(24)))
    2. 使用SQLContext.cacheTable而不是cache。它使用压缩列式存储来存储表,该存储可用于仅访问所需的列,如 Spark SQL and DataFrame Guide 中所述。 “将自动调整压缩以最小化内存使用和 GC 压力”。

      myDfWithBuckets.registerTempTable("myDfWithBuckets")
      sqlContext.cacheTable("myDfWithBuckets")
    3. 如果可以的话,仅缓存您实际需要的列,而不是每次都进行投影。

    4. 我不清楚 histogram 方法的来源是什么(您是否转换为 RDD[Double] 并使用 DoubleRDDFunctions.直方图?)以及参数是什么,但如果您想同时计算所有直方图,您可以尝试groupBy存储并应用一次直方图,例如使用histogram_numeric UDF:

      import org.apache.spark.sql.functions.callUDF

      val n: Int = ???

      myDfWithBuckets
      .groupBy($"bucket")
      .agg(callUDF("histogram_numeric", $"col1", lit(n)))

      如果您使用预定义范围,则可以使用自定义 UDF 获得类似的效果。

    注释

    • 如何提取由histogram_numeric计算的值?首先让我们创建一个小助手

      import org.apache.spark.sql.Row

      def extractBuckets(xs: Seq[Row]): Seq[(Double, Double)] =
      xs.map(x => (x.getDouble(0), x.getDouble(1)))

      现在我们可以使用模式匹配进行映射,如下所示:

      import org.apache.spark.rdd.RDD

      val histogramsRDD: RDD[(Int, Seq[(Double, Double)])] = histograms.map{
      case Row(k: Int, hs: Seq[Row @unchecked]) => (k, extractBuckets(hs)) }

    关于scala - Spark程序性能——GC&任务反序列化&并发执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33705134/

    30 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com