gpt4 book ai didi

scala - Spark DataFrame 中 bool 表达式的动态评估

转载 作者:行者123 更新时间:2023-12-05 05:33:20 24 4
gpt4 key购买 nike

假设我有一个 Spark DataFrame(在 Scala 中)

+---+---+---------------+
| a| b| expr|
+---+---+---------------+
| 0| 0|a = 1 AND b = 0|
| 0| 1| a = 0|
| 1| 0|a = 1 AND b = 1|
| 1| 1|a = 1 AND b = 1|
| 1| 1| null|
| 1| 1| a = 0 OR b = 1|
+---+---+---------------+

其中字符串列 expr 包含可空的 bool 表达式,这些表达式引用同一 DataFrame 中的其他数字列(ab) .

我想派生一个列eval(expr),它按行计算 bool 表达式expr,即

+---+---+---------------+----------+
| a| b| expr|eval(expr)|
+---+---+---------------+----------+
| 0| 0|a = 1 AND b = 0| false|
| 0| 1| a = 0| true|
| 1| 0|a = 1 AND b = 1| false|
| 1| 1|a = 1 AND b = 1| true|
| 1| 1| null| true|
| 1| 1| a = 0 OR b = 1| true|
+---+---+---------------+----------+

(特别是,尽管这是一个可选规范,null 的计算结果为 true)。

问题

创建 eval(expr) 的最佳方法是什么?

也就是说,我如何在 Spark DataFrame 中创建一个列来评估引用 DataFrame 中其他列的 bool 表达式列?


我在下面有一些不太令人满意的解决方案。他们在范围内假设以下 DataFrame:

val df: DataFrame = Seq(
(0, 0, "a = 1 AND b = 0"),
(0, 1, "a = 0"),
(1, 0, "a = 1 AND b = 1"),
(1, 1, "a = 1 AND b = 1"),
(1, 1, null),
(1, 1, "a = 0 OR b = 1")
).toDF("a", "b", "expr")

解决方案一

从单个表达式创建一个大型全局表达式:

val exprs: Column = concat(
df.columns
.filter(_ != "expr")
.zipWithIndex
.flatMap {
case (name, i) =>
if (i == 0)
Seq(lit(s"($name = "), col(name))
else
Seq(lit(s" AND $name = "), col(name))
} :+ lit(" AND (") :+ col("expr") :+ lit("))"): _*
)
// exprs: org.apache.spark.sql.Column = concat((a = , a, AND b = , b, AND (, expr, )))

val bigExprString = df.select(exprs).na.drop.as[String].collect.mkString(" OR ")
// bigExprString: String = (a = 0 AND b = 0 AND (a = 1 AND b = 0)) OR (a = 0 AND b = 1 AND (a = 0)) OR (a = 1 AND b = 0 AND (a = 1 AND b = 1)) OR (a = 1 AND b = 1 AND (a = 1 AND b = 1)) OR (a = 1 AND b = 1 AND (a = 0 OR b = 1))

val result: DataFrame = df.withColumn("eval(expr)", expr(bigExprString))

这里的缺点是生成的字符串非常大。在我的实际用例中,它可能有数万个字符长,甚至更长。我不太确定这是否会导致问题。

解决方案2

根据表达式列的值将DataFrame拆分成多个,分别对每个DataFrame进行操作,重新组合成一个DataFrame。

val exprs: Seq[String] = df.select("expr").distinct.as[String].collect
// exprs: Seq[String] = WrappedArray(a = 1 AND b = 1, a = 1 AND b = 0, null, a = 0, a = 0 OR b = 1)

val result: DataFrame = exprs.map(e =>
df.filter(col("expr") === e)
.withColumn("eval(expr)", if (e == null) lit(true) else when(expr(e), true).otherwise(false))
).reduce(_.union(_))
.show()

我认为这种方法的缺点是它创建了许多中间表(每个不同的表达式一个)。在我的实际用例中,这个数量可能是数百或数千。

最佳答案

使用 this answer scala.tools.reflect.ToolBox可用于在将表达式转换为有效的 Scala 表达式后对其求值:

case class Result(a: Integer, b: Integer, expr: String, result: Boolean)

df.mapPartitions(it => {
import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox
val tb = universe.runtimeMirror(this.getClass.getClassLoader).mkToolBox()
val res = it.map(r => {
val a = r.getInt(0)
val b = r.getInt(1)
val expr = r.getString(2)
val exprResult =
if ( expr == null) {
true
}
else {
val scalaExpr = expr.replace("=", "==").replace("AND", "&").replace("OR", "|")
val scalaExpr2 = s"var a=${a}; var b=${b}; ${scalaExpr}"
tb.eval(tb.parse(scalaExpr2)).asInstanceOf[Boolean]
}
Result(a, b, expr, exprResult)
})
res
}).show()

输出:

+---+---+---------------+------+
| a| b| expr|result|
+---+---+---------------+------+
| 0| 0|a = 1 AND b = 0| false|
| 0| 1| a = 0| true|
| 1| 0|a = 1 AND b = 1| false|
| 1| 1|a = 1 AND b = 1| true|
| 1| 1| null| true|
| 1| 1| a = 0 OR b = 1| true|
+---+---+---------------+------+

我在这里使用 mapPartitions 而不是简单的 udf,因为工具箱的初始化需要一些时间。它不是每行初始化一次,现在每个分区只初始化一次。

关于scala - Spark DataFrame 中 bool 表达式的动态评估,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73885497/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com