gpt4 book ai didi

scala - Apache Spark 中的 CPU 使用率是否受限?

转载 作者:行者123 更新时间:2023-12-01 00:40:40 26 4
gpt4 key购买 nike

我最近发现,即使在 local[1] 中运行 spark 时,在 UDF 中添加并行计算(例如使用并行集合)也能显着提高性能。模式或使用具有 1 个执行器和 1 个核心的 Yarn。

例如。在 local[1]模式下,Spark-Jobs 消耗尽可能多的 CPU(即,如果我有 8 个内核,则为 800%,使用 top 测量)。

这看起来很奇怪,因为我认为 Spark(或 yarn)限制了每个 Spark 应用程序的 CPU 使用率?

所以我想知道为什么会这样,是否建议在 spark 中使用并行处理/多线程,还是应该坚持使用 sparks 并行化模式?

这是一个示例(在具有 1 个实例和 1 个核心的 yarn 客户端模式中测量的时间)

case class MyRow(id:Int,data:Seq[Double])

// create dataFrame
val rows = 10
val points = 10000
import scala.util.Random.nextDouble
val data = {1 to rows}.map{i => MyRow(i, Stream.continually(nextDouble()).take(points))}
val df = sc.parallelize(data).toDF().repartition($"id").cache()

df.show() // trigger computation and caching

// some expensive dummy-computation for each array-element
val expensive = (d:Double) => (1 to 10000).foldLeft(0.0){case(a,b) => a*b}*d

val serialUDF = udf((in:Seq[Double]) => in.map{expensive}.sum)
val parallelUDF = udf((in:Seq[Double]) => in.par.map{expensive}.sum)

df.withColumn("sum",serialUDF($"data")).show() // takes ~ 10 seconds
df.withColumn("sum",parallelUDF($"data")).show() // takes ~ 2.5 seconds

最佳答案

Spark 不直接限制 CPU,而是定义了 Spark 创建的并发线程数。因此,对于 local[1],它基本上一次并行运行一项任务。当您执行 in.par.map{expensive} 时,您正在创建 spark 无法管理的线程,因此不受此限制处理。即您告诉 spark 将自身限制为单个线程,然后在 spark 不知道的情况下创建了其他线程。

通常,在 spark 操作中执行并行线程并不是一个好主意。相反,最好告诉 spark 它可以使用多少个线程,并确保您有足够的并行分区。

关于scala - Apache Spark 中的 CPU 使用率是否受限?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42619875/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com