gpt4 book ai didi

performance - 当rdd项很大时,为什么rdd.map(identity).cache变慢?

转载 作者:行者123 更新时间:2023-12-04 02:31:28 50 4
gpt4 key购买 nike

我发现在rdd上使用.map( identity ).cache时,如果项目很大,它将变得非常慢。否则,它几乎是瞬时的。

注意:这可能与this question有关,但是在这里我提供了一个非常精确的示例(可以直接在spark-shell中执行):

// simple function to profile execution time (in ms)
def profile[R](code: => R): R = {
val t = System.nanoTime
val out = code
println(s"time = ${(System.nanoTime - t)/1000000}ms")
out
}

// create some big size item
def bigContent() = (1 to 1000).map( i => (1 to 1000).map( j => (i,j) ).toMap )

// create rdd
val n = 1000 // size of the rdd

val rdd = sc.parallelize(1 to n).map( k => bigContent() ).cache
rdd.count // to trigger caching

// profiling
profile( rdd.count ) // around 12 ms
profile( rdd.map(identity).count ) // same
profile( rdd.cache.count ) // same
profile( rdd.map(identity).cache.count ) // 5700 ms !!!


我最初期望是时候创建一个新的rdd(容器)。但是,如果我使用大小相同但内容很少的rdd,则执行时间只有很小的差异:

val rdd = parallelize(1 to n).cache
rdd.count

profile( rdd.count ) // around 9 ms
profile( rdd.map(identity).count ) // same
profile( rdd.cache.count ) // same
profile( rdd.map(identity).cache.count ) // 15 ms


因此,看起来缓存实际上是在复制数据。我以为它也可能会浪费时间进行序列化,但是我检查了缓存是否使用了默认的MEMORY_ONLY持久性:

rdd.getStorageLevel == StorageLevel.MEMORY_ONLY // true


=>那么,是缓存复制数据,还是其他?

这实际上是我的应用程序的一个主要限制,因为我开始使用的设计与 rdd = rdd.map(f: Item => Item).cache类似,并且可以将这些功能以任意顺序应用(我无法事先确定)。

我正在使用Spark 1.6.0

编辑

当我查看spark ui->阶段选项卡->最后一个阶段(即4)时,所有任务的数据几乎相同:


持续时间= 3s(下降到3s,但仍然是2.9太多:-\)
调度器10ms
任务反序列化20ms
gc 0.1s(所有任务都有,但是为什么会触发gc ???)
结果序列化0ms
得到结果0ms
峰值执行记忆0.0B
输入大小7.0MB / 125
没有错误

最佳答案

在慢速缓存中运行jstack的进程的org.apache.spark.executor.CoarseGrainedExecutorBackend显示以下内容:

"Executor task launch worker-4" #76 daemon prio=5 os_prio=0 tid=0x00000000030a4800 nid=0xdfb runnable [0x00007fa5f28dd000]
java.lang.Thread.State: RUNNABLE
at java.util.IdentityHashMap.resize(IdentityHashMap.java:481)
at java.util.IdentityHashMap.put(IdentityHashMap.java:440)
at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:251)
at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
at org.apache.spark.util.SizeEstimator$$anonfun$sampleArray$1.apply$mcVI$sp(SizeEstimator.scala:284)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.SizeEstimator$.sampleArray(SizeEstimator.scala:276)
at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:260)
at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


"Executor task launch worker-5" #77 daemon prio=5 os_prio=0 tid=0x00007fa6218a9800 nid=0xdfc runnable [0x00007fa5f34e7000]
java.lang.Thread.State: RUNNABLE
at java.util.IdentityHashMap.put(IdentityHashMap.java:428)
at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:224)
at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:223)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:223)
at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


SizeEstimator是缓存表面上已经存在的某些东西的主要代价之一,因为对未知对象的正确大小估计可能相当困难;如果查看 visitSingleObject方法,可以看到它严重依赖于反射,调用 getClassInfo可以访问运行时类型信息。不仅遍历了完整的对象层次结构,而且每个嵌套成员都根据 IdentityHashMap进行了检查,以检测哪些引用引用了相同的具体对象实例,因此堆栈跟踪在IdentityHashMap操作中显示了大量时间。

就示例对象而言,基本上每个项目都是从包装的整数到包装的整数的映射列表。大概Scala的内部地图实现也包含一个数组,这解释了visitSingleObject-> List.foreach-> visitSingleObject-> visitSingleObject调用层次结构。无论如何,在这种情况下,都有很多内部对象要访问,并且SizeEstimators为每个采样的对象设置一个新的IdentityHashMap。

在测量的情况下:

profile( rdd.cache.count )


由于RDD已成功缓存,因此这不算是执行缓存逻辑,因此Spark很聪明,不会重新运行缓存逻辑。实际上,通过对新的RDD创建进行概要分析并直接进行缓存,可以独立于额外的“ map(identity)”转换而隔离出缓存逻辑的确切成本。这是我的Spark会话,从最后几行继续:

scala> profile( rdd.count )
time = 91ms
res1: Long = 1000

scala> profile( rdd.map(identity).count )
time = 112ms
res2: Long = 1000

scala> profile( rdd.cache.count )
time = 59ms
res3: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 6564ms
res4: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).count )
time = 14990ms
res5: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).cache.count )
time = 22229ms
res6: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).map(identity).cache.count )
time = 21922ms
res7: Long = 1000


因此,您可以看到,缓慢的原因并非来自您本身进行了 map转换的事实,而是在这种情况下,〜6s似乎是计算1000个对象的缓存逻辑的基本成本。对象具有大约1,000,000至〜10,000,000内部对象(取决于Map实现的布局方式;顶部堆栈跟踪中额外的 visitArray嵌套暗示HashMap impl具有嵌套数组,这对于典型的密集线性数组是有意义的-在每个哈希表条目内探测数据结构)。

对于您的具体用例,如果可能的话,您应该在惰性缓存方面犯错,因为如果您不打算将中间结果用于许多单独的下游转换,则缓存中间结果会带来不小的折衷。但是正如您在问题中提到的那样,如果您确实使用一个RDD分支到多个不同的下游转换,那么如果原始转换非常昂贵,则可能确实需要缓存步骤。

解决方法是尝试拥有更适合恒定时间计算的内部数据结构(例如,原语数组),在这种情况下,您可以节省大量成本,避免迭代大量包装对象,并依赖于它们的反射。 SizeEstimator。

我尝试了诸如Array [Array [Int]]之类的方法,尽管仍然存在非零开销,但对于类似的数据大小,它却要好10倍:

scala> def bigContent2() = (1 to 1000).map( i => (1 to 1000).toArray ).toArray
bigContent2: ()Array[Array[Int]]

scala> val rdd = sc.parallelize(1 to n).map( k => bigContent2() ).cache
rdd: org.apache.spark.rdd.RDD[Array[Array[Int]]] = MapPartitionsRDD[23] at map at <console>:28

scala> rdd.count // to trigger caching
res16: Long = 1000

scala>

scala> // profiling

scala> profile( rdd.count )
time = 29ms
res17: Long = 1000

scala> profile( rdd.map(identity).count )
time = 42ms
res18: Long = 1000

scala> profile( rdd.cache.count )
time = 34ms
res19: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 763ms
res20: Long = 1000


为了说明在任何奇特的对象上反射的代价有多糟糕,如果我删除那里的最后一个 toArray并最终使每个 bigContent成为 scala.collection.immutable.IndexedSeq[Array[Int]],则性能可以回到其慢度的约2倍之内 IndexSeq[Map[Int,Int]]原始案例:

scala> def bigContent3() = (1 to 1000).map( i => (1 to 1000).toArray )
bigContent3: ()scala.collection.immutable.IndexedSeq[Array[Int]]

scala> val rdd = sc.parallelize(1 to n).map( k => bigContent3() ).cache
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.IndexedSeq[Array[Int]]] = MapPartitionsRDD[27] at map at <console>:28

scala> rdd.count // to trigger caching
res21: Long = 1000

scala>

scala> // profiling

scala> profile( rdd.count )
time = 27ms
res22: Long = 1000

scala> profile( rdd.map(identity).count )
time = 39ms
res23: Long = 1000

scala> profile( rdd.cache.count )
time = 37ms
res24: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 2781ms
res25: Long = 1000


如评论部分所述,您还可以考虑使用MEMORY_ONLY_SER StorageLevel,只要有一个有效的序列化程序,它就可能比SizeEstimator中使用的递归反射便宜。为此,您只需将 cache()替换为 persist(StorageLevel.MEMORY_ONLY_SER);如 this other question中所述, cache()在概念上与 persist(StorageLevel.MEMORY_ONLY)相同。

import org.apache.spark.storage.StorageLevel
profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )


我实际上在运行Spark 1.6.1和Spark 2.0.0-preview时都尝试过此操作,其他所有与群集配置完全相同的操作(分别使用 Google Cloud Dataproc的“ 1.0”和“ preview”图像版本)。不幸的是,MEMORY_ONLY_SER技巧在Spark 1.6.1中似乎没有帮助:

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 6709ms
res19: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 6126ms
res20: Long = 1000

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 6214ms
res21: Long = 1000


但是在Spark 2.0.0-preview中,它似乎将性能提高了10倍:

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 500ms
res18: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 5353ms
res19: Long = 1000

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 5927ms
res20: Long = 1000


但是,这可能取决于您的对象。仅在序列化本身不使用大量反射的情况下,才有望提速。如果您能够有效地使用 Kryo serialization,那么对于这些​​大型对象,可能会看到使用 MEMORY_ONLY_SER的改进。

关于performance - 当rdd项很大时,为什么rdd.map(identity).cache变慢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37859386/

50 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com