gpt4 book ai didi

python - 如果我缓存一个 Spark Dataframe 然后覆盖引用,原始数据帧还会被缓存吗?

转载 作者:行者123 更新时间:2023-12-03 14:26:38 28 4
gpt4 key购买 nike

假设我有一个生成 (py)spark 数据帧的函数,将数据帧缓存到内存中作为最后一个操作。

def gen_func(inputs):
df = ... do stuff...
df.cache()
df.count()
return df
据我了解,Spark 的缓存工作如下:
  • cache/persist加上一个 Action ( count() )在数据上被调用
    帧,它从它的 DAG 计算并缓存到内存中,附加
    指向它的对象。
  • 只要存在对该对象的引用,可能在其他函数/其他范围内,df 将继续被缓存,并且依赖 df 的所有 DAG 都将使用内存中缓存的数据作为起点。
  • 如果对 df 的所有引用都被删除,Spark 会将缓存作为内存进行垃圾收集。它可能不会立即被垃圾收集,导致一些短期内存块(特别是如果您生成缓存数据并太快丢弃它们会导致内存泄漏),但最终它会被清除。

  • 我的问题是,假设我使用 gen_func生成数据帧,然后覆盖原始数据帧引用(可能使用 filterwithColumn )。
    df=gen_func(inputs)
    df=df.filter("some_col = some_val")
    在 Spark 中,RDD/DF 是不可变的,因此过滤器后重新分配的 df 和过滤器前的 df 指的是两个完全不同的对象。在这种情况下,对原始 df 的引用是 cache/counted已被覆盖。这是否意味着缓存的数据帧不再可用并且将被垃圾收集?这是否意味着新的后置过滤器 df尽管是从以前缓存的数据帧生成的,但会从头开始计算所有内容吗?
    我问这个是因为我最近正在用我的代码修复一些内存不足的问题,在我看来缓存可能是问题所在。但是,我还没有真正了解使用缓存的安全方法的全部细节,以及如何意外地使缓存内存无效。我的理解中缺少什么?我在执行上述操作时是否偏离了最佳实践?

    最佳答案

    我做了几个实验,如下所示。显然,数据帧,一旦缓存,仍然缓存 (如 getPersistentRDDs 和查询计划 - InMemory 等所示),即使使用 del 覆盖或删除了所有 Python 引用,并明确调用垃圾收集。
    实验一:

    def func():
    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    data.cache()
    data.count()
    return data

    sc._jsc.getPersistentRDDs()

    df = func()
    sc._jsc.getPersistentRDDs()

    df2 = df.filter('col1 != 2')
    del df
    import gc
    gc.collect()
    sc._jvm.System.gc()
    sc._jsc.getPersistentRDDs()

    df2.select('*').explain()

    del df2
    gc.collect()
    sc._jvm.System.gc()
    sc._jsc.getPersistentRDDs()
    结果:
    >>> def func():
    ... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    ... data.cache()
    ... data.count()
    ... return data
    ...
    >>> sc._jsc.getPersistentRDDs()
    {}

    >>> df = func()
    >>> sc._jsc.getPersistentRDDs()
    {71: JavaObject id=o234}

    >>> df2 = df.filter('col1 != 2')
    >>> del df
    >>> import gc
    >>> gc.collect()
    93
    >>> sc._jvm.System.gc()
    >>> sc._jsc.getPersistentRDDs()
    {71: JavaObject id=o240}

    >>> df2.select('*').explain()
    == Physical Plan ==
    *(1) Filter (isnotnull(col1#174L) AND NOT (col1#174L = 2))
    +- *(1) ColumnarToRow
    +- InMemoryTableScan [col1#174L], [isnotnull(col1#174L), NOT (col1#174L = 2)]
    +- InMemoryRelation [col1#174L], StorageLevel(disk, memory, deserialized, 1 replicas)
    +- *(1) Project [_1#172L AS col1#174L]
    +- *(1) Scan ExistingRDD[_1#172L]

    >>> del df2
    >>> gc.collect()
    85
    >>> sc._jvm.System.gc()
    >>> sc._jsc.getPersistentRDDs()
    {71: JavaObject id=o250}
    实验二:
    def func():
    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    data.cache()
    data.count()
    return data

    sc._jsc.getPersistentRDDs()

    df = func()
    sc._jsc.getPersistentRDDs()

    df = df.filter('col1 != 2')
    import gc
    gc.collect()
    sc._jvm.System.gc()
    sc._jsc.getPersistentRDDs()

    df.select('*').explain()

    del df
    gc.collect()
    sc._jvm.System.gc()
    sc._jsc.getPersistentRDDs()
    结果:
    >>> def func():
    ... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    ... data.cache()
    ... data.count()
    ... return data
    ...
    >>> sc._jsc.getPersistentRDDs()
    {}

    >>> df = func()
    >>> sc._jsc.getPersistentRDDs()
    {86: JavaObject id=o317}

    >>> df = df.filter('col1 != 2')
    >>> import gc
    >>> gc.collect()
    244
    >>> sc._jvm.System.gc()
    >>> sc._jsc.getPersistentRDDs()
    {86: JavaObject id=o323}

    >>> df.select('*').explain()
    == Physical Plan ==
    *(1) Filter (isnotnull(col1#220L) AND NOT (col1#220L = 2))
    +- *(1) ColumnarToRow
    +- InMemoryTableScan [col1#220L], [isnotnull(col1#220L), NOT (col1#220L = 2)]
    +- InMemoryRelation [col1#220L], StorageLevel(disk, memory, deserialized, 1 replicas)
    +- *(1) Project [_1#218L AS col1#220L]
    +- *(1) Scan ExistingRDD[_1#218L]

    >>> del df
    >>> gc.collect()
    85
    >>> sc._jvm.System.gc()
    >>> sc._jsc.getPersistentRDDs()
    {86: JavaObject id=o333}
    实验 3(对照实验,证明 unpersist 有效)
    def func():
    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    data.cache()
    data.count()
    return data

    sc._jsc.getPersistentRDDs()

    df = func()
    sc._jsc.getPersistentRDDs()

    df2 = df.filter('col1 != 2')
    df2.select('*').explain()

    df.unpersist()
    df2.select('*').explain()
    结果:
    >>> def func():
    ... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    ... data.cache()
    ... data.count()
    ... return data
    ...
    >>> sc._jsc.getPersistentRDDs()
    {}

    >>> df = func()
    >>> sc._jsc.getPersistentRDDs()
    {116: JavaObject id=o398}

    >>> df2 = df.filter('col1 != 2')
    >>> df2.select('*').explain()
    == Physical Plan ==
    *(1) Filter (isnotnull(col1#312L) AND NOT (col1#312L = 2))
    +- *(1) ColumnarToRow
    +- InMemoryTableScan [col1#312L], [isnotnull(col1#312L), NOT (col1#312L = 2)]
    +- InMemoryRelation [col1#312L], StorageLevel(disk, memory, deserialized, 1 replicas)
    +- *(1) Project [_1#310L AS col1#312L]
    +- *(1) Scan ExistingRDD[_1#310L]

    >>> df.unpersist()
    DataFrame[col1: bigint]
    >>> sc._jsc.getPersistentRDDs()
    {}

    >>> df2.select('*').explain()
    == Physical Plan ==
    *(1) Project [_1#310L AS col1#312L]
    +- *(1) Filter (isnotnull(_1#310L) AND NOT (_1#310L = 2))
    +- *(1) Scan ExistingRDD[_1#310L]
    要回答OP的问题:

    Does that mean that the cached data frame is no longer available and will be garbage collected? Does that mean that the new post-filter df will compute everything from scratch, despite being generated from a previously cached data frame?


    实验建议 没有对彼此而言。数据帧保持缓存状态,不会被垃圾收集,并且根据查询计划,使用缓存的(不可引用的)数据帧计算新的数据帧。
    一些与缓存使用相关的有用功能(如果您不想通过 Spark UI 执行此操作)是: sc._jsc.getPersistentRDDs() ,它显示了缓存的 RDD/数据帧的列表,以及 spark.catalog.clearCache() ,这会清除所有缓存的 RDD/数据帧。

    Am I deviating from best practice in doing the above?


    我无权就此评判你,但正如其中一条评论所建议的,避免重新分配给 df因为数据帧是不可变的。试着想象你在 scala 中编码并且你定义了 df作为 val .做 df = df.filter(...)是不可能的。 Python 本身不能强制执行,但我认为最好的做法是避免覆盖任何数据帧变量,以便您始终可以调用 df.unpersist()之后,如果您不再需要缓存的结果。

    关于python - 如果我缓存一个 Spark Dataframe 然后覆盖引用,原始数据帧还会被缓存吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60255595/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com