gpt4 book ai didi

scala - Spark 执行器上的对象缓存

转载 作者:行者123 更新时间:2023-12-03 22:35:27 26 4
gpt4 key购买 nike

Spark 专家的一个好问题。

我正在处理 map 中的数据操作(RDD)。在映射器函数中,我需要查找类 A 的对象用于处理 RDD 中的元素。

因为这将在执行器上执行并创建 A 类型的元素(将被查找)恰好是一项昂贵的操作,我想在每个执行程序上预加载和缓存这些对象。最好的方法是什么?

  • 一个想法是广播一个查找表,但是类 A不可序列化(无法控制其实现)。
  • 另一个想法是将它们加载到单例对象中。但是,我想控制加载到该查找表中的内容(例如,不同 Spark 作业上可能有不同的数据)。

  • 理想情况下,我想通过一个参数指定将在执行程序上加载一次(包括流式处理的情况,以便查找表在批处理之间保留在内存中),通过在驱动程序启动期间可用的参数,在任何之前数据得到处理。

    是否有一种干净而优雅的方式来做到这一点,或者是不可能实现的?

    最佳答案

    这正是 broadcast. 的目标用例广播变量被传输一次,并使用种子有效地移动到所有执行程序,并保留在内存/本地磁盘中,直到您不再需要它们。

    在使用其他人的接口(interface)时,序列化经常会作为一个问题出现。如果您可以强制您使用的对象是可序列化的,那将是最好的解决方案。如果这是不可能的,你的生活就会变得更加复杂。如果无法序列​​化 A对象,那么您必须在每个任务的执行程序上创建它们。如果它们存储在某处的文件中,则如下所示:

    rdd.mapPartitions { it => 
    val lookupTable = loadLookupTable(path)
    it.map(elem => fn(lookupTable, elem))
    }

    请注意,如果您使用此模型,则必须为每个任务加载一次查找表——您无法从广播变量的跨任务持久性中受益。

    编辑:这是另一个模型,我相信它可以让您在每个 JVM 的任务之间共享查找表。
    class BroadcastableLookupTable {
    @transient val lookupTable: LookupTable[A] = null

    def get: LookupTable[A] = {
    if (lookupTable == null)
    lookupTable = < load lookup table from disk>
    lookupTable
    }
    }

    此类可以广播(不传输任何实质性内容),并且在每个 JVM 第一次调用它时,您将加载查找表并返回它。

    关于scala - Spark 执行器上的对象缓存,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40435741/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com