gpt4 book ai didi

apache-spark - Spark - 使用堆外内存

转载 作者:行者123 更新时间:2023-12-04 14:02:22 24 4
gpt4 key购买 nike

spark.memory.offheap.enabled=true 时,Spark 可以使用堆外内存进行随机播放和缓存 (StorageLevel.OFF_HEAP)。堆外内存可以用来存储广播变量吗?怎么办?

最佳答案

简而言之,,您不能对广播变量使用StorageLevel.OFF_HEAP

要了解原因,让我们看一下 source code对于 SparkContext.broadcast(...) 方法。

/**
* Broadcast a read-only variable to the cluster, returning a
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions ...
*/
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
:
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
:
bc
}

在上面的代码中,broadcastManager.newBroadcast(...) 创建了 Broadcast 对象,它是此方法的返回类型。

现在,让我们深入挖掘并检查 newBroadcast() .

def newBroadcast(value_ : T, isLocal: Boolean): Broadcast[T] = {
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
}

在上面的代码中,broadcastManager 有一个名为 broadcastFactory 的组件,它使用抽象工厂设计模式将广播变量的创建委托(delegate)给它的相关工厂。

另请注意,BroadcastManager 会跟踪每个 broadcast 变量的唯一 id,每个新的广播变量都会递增。

目前spark中可以初始化的BroadcastFactory只有一种,就是TorrentBroadcastFactory。这可以在 initialization code 中看到BroadcastManager 的。

// Called by SparkContext or Executor before using Broadcast
private def initialize() {
:
broadcastFactory = new TorrentBroadcastFactory
:
}

引用source code TorrentBroadcastFactory

Broadcast implementation that uses a BitTorrent-like protocol to do adistributed transfer of the broadcasted data to the executors

这个特定的工厂使用 TorrentBroadcast .这个类的描述非常有用。

The driver divides the serialized object into small chunks and storesthose chunks in the BlockManager of the driver.

On each executor, the executor first attempts to fetch the object from its BlockManager.If it does not exist, it then uses remote fetches to fetch the smallchunks from the driver and/or other executors if available. Once itgets the chunks, it puts the chunks in its own BlockManager, ready forother executors to fetch from. This prevents the driver from beingthe bottleneck in sending out multiple copies of the broadcast data(one per executor).

阅读writeBlock TorrentBroadcast 类的函数,我们可以看到此广播的硬编码 StorageLevel.MEMORY_AND_DISK_SER 选项。

  /**
* Divide the object into multiple blocks and put those blocks in the block manager.
*
* @param value the object to divide
* @return number of blocks this broadcast variable is divided into
*/
private def writeBlocks(value: T): Int = {
import StorageLevel._
:
:
if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
throw new SparkException(s"Failed to store $pieceId of $broadcastId " + s"in local BlockManager")
}
:
:

因此,由于这段代码使用了 StorageLevel.MEMORY_AND_DISK_SER 的硬编码值,我们不能将 StorageLevel.OFF_HEAP 用于广播变量。

关于apache-spark - Spark - 使用堆外内存,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69587778/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com