gpt4 book ai didi

apache-spark - Spark 动态 DAG 比硬编码的 DAG 慢很多

转载 作者:行者123 更新时间:2023-12-04 04:18:06 26 4
gpt4 key购买 nike

我在 spark 中有一个操作,应该对数据框中的几列执行。通常,有 2 种可能性来指定此类操作

  • 硬编码

  • handleBias("bar", df)
    .join(handleBias("baz", df), df.columns)
    .drop(columnsToDrop: _*).show
  • 从列名列表中动态生成它们

  • var isFirst = true
    var res = df
    for (col <- columnsToDrop ++ columnsToCode) {
    if (isFirst) {
    res = handleBias(col, res)
    isFirst = false
    } else {
    res = handleBias(col, res)
    }
    }
    res.drop(columnsToDrop: _*).show

    问题是动态生成的 DAG 是不同的,当使用更多列时,动态解决方案的运行时间比硬编码操作增加得更多。

    我很好奇如何 将动态结构的优雅与快速执行时间相结合 .

    这是示例代码的 DAG 的比较
    complexity comparison

    对于大约 80 列,这会为硬编码变体生成一个相当不错的图表
    hardCoded
    对于动态构造的查询,一个非常大的、可能不太并行且速度较慢的 DAG。
    hugeMessDynamic

    当前版本的 spark (2.0.2) 与 DataFrames 一起使用和 spark-sql

    完成最小示例的代码:

    def handleBias(col: String, df: DataFrame, target: String = "FOO"): DataFrame = {
    val pre1_1 = df
    .filter(df(target) === 1)
    .groupBy(col, target)
    .agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
    .drop(target)

    val pre2_1 = df
    .groupBy(col)
    .agg(mean(target).alias("pre2_" + col))

    df
    .join(pre1_1, Seq(col), "left")
    .join(pre2_1, Seq(col), "left")
    .na.fill(0)
    }

    编辑

    使用 foldleft 运行您的任务生成线性 DAG
    foldleft
    并对所有列的函数进行硬编码导致
    hardcoded

    两者都比我原来的 DAG 好很多,但是,硬编码的变体对我来说看起来更好。在 spark 中连接 SQL 语句的字符串可以让我动态生成硬编码的执行图,但这看起来相当难看。你看到任何其他选择吗?

    最佳答案

    编辑 1:从 handleBias 中删除了一个窗口函数并将其转换为广播连接。

    编辑 2:更改了空值的替换策略。

    我有一些建议可以改进您的代码。首先,对于“handleBias”函数,我会使用窗口函数和“withColumn”调用来完成,避免连接:

    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window

    def handleBias(df: DataFrame, colName: String, target: String = "foo") = {
    val w1 = Window.partitionBy(colName)
    val w2 = Window.partitionBy(colName, target)
    val result = df
    .withColumn("cnt_group", count("*").over(w2))
    .withColumn("pre2_" + colName, mean(target).over(w1))
    .withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
    .drop("cnt_group")
    result
    }

    然后,对于多列调用它,我建议使用 foldLeft这是此类问题的“功能”方法:

    val df = Seq((1, "first", "A"), (1, "second", "A"),(2, "noValidFormat", "B"),(1, "lastAssumingSameDate", "C")).toDF("foo", "bar", "baz")

    val columnsToDrop = Seq("baz")
    val columnsToCode = Seq("bar", "baz")
    val target = "foo"

    val targetCounts = df.filter(df(target) === 1).groupBy(target)
    .agg(count(target).as("cnt_foo_eq_1"))
    val newDF = df.join(broadcast(targetCounts), Seq(target), "left")

    val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(df) {
    (currentDF, colName) => handleBias(currentDF, colName)
    }

    result.drop(columnsToDrop:_*).show()

    +---+--------------------+------------------+--------+------------------+--------+
    |foo| bar| pre_baz|pre2_baz| pre_bar|pre2_bar|
    +---+--------------------+------------------+--------+------------------+--------+
    | 2| noValidFormat| 0.0| 2.0| 0.0| 2.0|
    | 1|lastAssumingSameDate|0.3333333333333333| 1.0|0.3333333333333333| 1.0|
    | 1| second|0.6666666666666666| 1.0|0.3333333333333333| 1.0|
    | 1| first|0.6666666666666666| 1.0|0.3333333333333333| 1.0|
    +---+--------------------+------------------+--------+------------------+--------+

    我不确定它是否会大大改善您的 DAG,但至少它使代码更清晰、更具可读性。

    引用:
  • 关于窗口函数的 Databricks 文章:https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
  • 可用函数的 API 文档:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions $
  • 左折叠:https://coderwall.com/p/4l73-a/scala-fold-foldleft-and-foldright
  • 关于apache-spark - Spark 动态 DAG 比硬编码的 DAG 慢很多,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41169873/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com