gpt4 book ai didi

scala - 编写 spark UDF(而不是将 UDF 作为一个)时是否会降低性能?

转载 作者:行者123 更新时间:2023-12-01 09:04:09 27 4
gpt4 key购买 nike

我想知道编写 spark udf 是否会降低性能。一般来说,我更喜欢组合做一件事的小函数……

这是一个简单的例子,给定一个 DataFrame df:

def inc = udf( (i: Double) => i + 1)
def double = udf( (i: Double) => i * 2)
df.withColumn("r", double(inc($"c")))

相对
def incAndDouble = udf( (i: Double) => (i + 1) * 2)
df.withColumn("r", incAndDouble($"c")

从我所见,这个简单示例的性能是相同的。

你能解释一下为什么吗? Spark 在幕后如何运作?

它总是真的吗?

[更新]

当可以进行巧妙的组合(不仅仅是简单的函数组合)时,我可能会有一个反例,如下例所示
def filter = udf((s: Seq[String]) => s.startsWith("A"))
def size = udf((s: Seq[String]) => s.size)

val filterAndSize = udf((s: Seq[String]) => s.count(_.startsWith("A")))

所以,我猜 filterAndSize 更可取,因为它会避免一些中间集合实例化。

最佳答案

TL;博士 可能会有一些性能下降或惩罚,但可以忽略不计。

Can you explain why ?



用“explain”看到您的问题非常有趣,这正是用于查看在 Spark SQL 的掩护下发生的事情以及它如何执行查询的方法的名称:)

所以,使用 Dataset.explain甚至更详细的版本 Dataset.explain(extended = true)查看所有优化(以及可能的性能下降)。
def inc = udf( (i: Double) => i + 1)
def double = udf( (i: Double) => i * 2)

val df = Seq(1,2,3).toDF("c")
val q = df.withColumn("r", double(inc($"c")))

由两个 UDF 组成的计划如下所示。
scala> q.explain(extended = true)
== Parsed Logical Plan ==
'Project [c#3, UDF(UDF('c)) AS r#10]
+- AnalysisBarrier Project [value#1 AS c#3]

== Analyzed Logical Plan ==
c: int, r: double
Project [c#3, if (isnull(if (isnull(cast(c#3 as double))) null else UDF(cast(c#3 as double)))) null else UDF(if (isnull(cast(c#3 as double))) null else UDF(cast(c#3 as double))) AS r#10]
+- Project [value#1 AS c#3]
+- LocalRelation [value#1]

== Optimized Logical Plan ==
LocalRelation [c#3, r#10]

== Physical Plan ==
LocalTableScan [c#3, r#10]

让我们看看计划如何使用一个 UDF,它是两个 UDF 的组合。
def incAndDouble = udf( (i: Double) => (i + 1) * 2)
val q = df.withColumn("r", incAndDouble($"c"))
scala> q.explain(extended = true)
== Parsed Logical Plan ==
'Project [c#3, UDF('c) AS r#16]
+- AnalysisBarrier Project [value#1 AS c#3]

== Analyzed Logical Plan ==
c: int, r: double
Project [c#3, if (isnull(cast(c#3 as double))) null else UDF(cast(c#3 as double)) AS r#16]
+- Project [value#1 AS c#3]
+- LocalRelation [value#1]

== Optimized Logical Plan ==
LocalRelation [c#3, r#16]

== Physical Plan ==
LocalTableScan [c#3, r#16]

在这种特殊情况下,没有区别,因为查询中的物理计划相同,即 LocalTableScan .

它可能与其他数据源(如文件或 JDBC)不同,但我个人的建议是开发尽可能小的 UDF,因为它们是 Spark 优化器的黑盒。

Is it always true ?



不,完全不是,因为它在很大程度上取决于您在 UDF 中所做的事情(但这与是否首先编写 UDF 有更多关系)。

在以下 UDF 是谓词的情况下(即返回 bool 值):
def filter = udf((s: Seq[String]) => s.startsWith("A"))

Spark 可以优化 UDF 的使用(如果它是 不是 UDF 而是一个简单的 filter 操作)并将其下推到数据源以加载更少的数据。这可能会对性能产生巨大影响。

关于scala - 编写 spark UDF(而不是将 UDF 作为一个)时是否会降低性能?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47702789/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com