gpt4 book ai didi

apache-spark - Spark缓存对优化逻辑计划的影响

转载 作者:行者123 更新时间:2023-12-03 23:45:34 27 4
gpt4 key购买 nike

我正在看这个问题和很好的答案 Spark: Explicit caching can interfere with Catalyst optimizer's ability to optimize some queries?
要点是:

val df = spark.range(100)
df.join(df, Seq("id")).filter('id <20).explain(true)
通过首先应用过滤为不使用索引的系统生成足够健壮的计划:
== Optimized Logical Plan ==
Project [id#16L]
+- Join Inner, (id#16L = id#18L)
:- Filter (id#16L < 20)
: +- Range (0, 100, step=1, splits=Some(8))
+- Filter (id#18L < 20)
+- Range (0, 100, step=1, splits=Some(8))
该示例然后表明:
df.join(df, Seq("id")).cache.filter('id <20).explain(true)
生成这个计划:
== Optimized Logical Plan ==
Filter (id#16L < 20)
+- InMemoryRelation [id#16L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(2) Project [id#16L]
+- *(2) BroadcastHashJoin [id#16L], [id#21L], Inner, BuildRight
:- *(2) Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#112]
+- *(1) Range (0, 100, step=1, splits=8)
那么这个呢?
df.join(df, Seq("id")).filter('id <20).cache.explain(true)
产生:
== Optimized Logical Plan ==
InMemoryRelation [id#16L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Filter (id#16L < 20)
+- *(1) InMemoryTableScan [id#16L], [(id#16L < 20)]
+- InMemoryRelation [id#16L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(2) Project [id#16L]
+- *(2) BroadcastHashJoin [id#16L], [id#21L], Inner, BuildRight
:- *(2) Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#112]
+- *(1) Range (0, 100, step=1, splits=8)
寻求澄清。
  • 我原以为第一个 Opt Log Pl 会以缓存作为最后一个方面。我怀疑一定很简单。它是一样的吗?我想不是。
  • 最佳答案

    在这里,我确实认为您在实验中遇到了错误。
    如果您在新的 spark-shell 中运行以下命令:

    val df = spark.range(100)
    df.join(df, Seq("id")).filter('id <20).cache.explain(true)
    您将拥有以下优化的逻辑计划:
    == Optimized Logical Plan ==
    InMemoryRelation [id#0L], StorageLevel(disk, memory, deserialized, 1 replicas)
    +- *(2) Project [id#0L]
    +- *(2) BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight
    :- *(2) Filter (id#0L < 20)
    : +- *(2) Range (0, 100, step=1, splits=12)
    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
    +- *(1) Filter (id#2L < 20)
    +- *(1) Range (0, 100, step=1, splits=12)
    使用下推谓词正确推送过滤器。
    但是,在新的 spark-shell 中,如果您运行:
    val df = spark.range(100)
    df.join(df, Seq("id")).cache.filter('id <20).explain(true)
    df.join(df, Seq("id")).filter('id <20).cache.explain(true)
    您将拥有以下优化的逻辑计划:
    == Optimized Logical Plan ==
    InMemoryRelation [id#0L], StorageLevel(disk, memory, deserialized, 1 replicas)
    +- *(1) Filter (id#0L < 20)
    +- *(1) InMemoryTableScan [id#0L], [(id#0L < 20)]
    +- InMemoryRelation [id#0L], StorageLevel(disk, memory, deserialized, 1 replicas)
    +- *(2) Project [id#0L]
    +- *(2) BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight
    :- *(2) Range (0, 100, step=1, splits=12)
    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
    +- *(1) Range (0, 100, step=1, splits=12)
    没有优化的计划。
    为什么 ?
    这是因为我们已经缓存了 DAG: df.join(df, Seq("id")) .
    所以即使我们用过滤器和过滤器后的缓存再次写入它,Spark-Engine 也会看到 join DAG 并从这里运行它,然后添加一个过滤器。对于 Spark 引擎,使用缓存的 Dataframe 比重新计算整个 DAG 更快。
    怎么解决 ?
    一个可以简单地 unpersist达格: df.join(df, Seq("id")).unpersist()然后 df.join(df, Seq("id")).filter('id <20).cache.explain(true)给出正确的 OLP

    关于apache-spark - Spark缓存对优化逻辑计划的影响,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63022569/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com