gpt4 book ai didi

python-3.x - pyspark 是否会更改优化指令的顺序?

转载 作者:行者123 更新时间:2023-12-03 17:50:19 26 4
gpt4 key购买 nike

假设我有以下管道:

df.orderBy('foo').limit(10).show()

在这里我们可以看到 orderBy指令在前,所以数据帧的所有行都应该排在 limit 之前指令被执行。我发现自己在思考 Spark 是否会在管道内部进行一些“重组”以提高性能(例如,在 和 limit 之前执行 orderBy 指令 )。 Spark 能做到吗?

最佳答案

你的假设是正确的。 Spark 执行 sort然后 limit在合并/收集结果之前在每个分区上,我们将在接下来看到。

orderBy其次是 limit将导致下一次调用:

  • [ Dataset.scala ] 数据集:orderBy()
  • [ Dataset.scala ] 数据集:sortInternal()
  • [ SparkStrategies.scala ] 特殊限制:应用()
  • [ limit.scala ] TakeOrderedAndProjectExec:doExecute()

  • 通过查看 TakeOrderedAndProjectExec:doExecute()方法我们首先会遇到下一段代码:

    protected override def doExecute(): RDD[InternalRow] = {
    val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
    val localTopK: RDD[InternalRow] = {
    child.execute().map(_.copy()).mapPartitions { iter =>
    org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord)
    }
    }

    ......


    在这里我们可以看到 localTopK通过获取 填充来自每个排序分区的 topK 第一条记录 .这意味着 Spark 尝试在分区级别尽快下推 topK 过滤器。

    接下来的几行:

    ....

    val shuffled = new ShuffledRowRDD(
    ShuffleExchangeExec.prepareShuffleDependency(
    localTopK,
    child.output,
    SinglePartition,
    serializer,
    writeMetrics),
    readMetrics)
    shuffled.mapPartitions { iter =>
    val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
    if (projectList != child.output) {
    val proj = UnsafeProjection.create(projectList, child.output)
    topK.map(r => proj(r))
    } else {
    topK
    }
    }

    将生成最终 ShuffledRowRDD来自所有分区,这些分区将包含组成 limit 的最终结果的最终 topK 排序记录。 .

    示例

    让我们通过一个例子来说明这一点。考虑范围为 1,2,3...20 的数据集它分为两部分。第一个包含奇数,而第二个包含偶数,如下所示:
    -----------   -----------
    | P1 | | P2 |
    ----------- -----------
    | 1 | | 2 |
    | 3 | | 4 |
    | 5 | | 6 |
    | 7 | | 8 |
    | 9 | | 10 |
    | .... | | .... |
    | 19 | | 20 |
    ----------- -----------

    df.orderBy(...).limit(5)执行 Spark 将从每个分区中获取前 5 个排序记录,即第一个分区的 1-9 和第二个分区的 2-10。然后它将合并和排序它们又名序列 1,2,3,4,5..10 .最后它将获得生成最终列表的前 5 条记录 1,2,3,4,5 .

    结论

    Spark 利用所有可用信息来处理 orderBy其次是 limit通过省略处理整个数据集,但只处理前 topK 行。正如@ShemTov 已经提到的,无需调用 limit之前 orderBy因为第一个会返回无效的数据集,第二个是因为 Spark 在内部为您做了所有必要的优化。

    关于python-3.x - pyspark 是否会更改优化指令的顺序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59195346/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com