gpt4 book ai didi

scala spark 使用 expr 在列内取值

转载 作者:行者123 更新时间:2023-12-03 23:53:04 30 4
gpt4 key购买 nike

我需要向具有 bool 值的数据帧添加一个新列,评估数据帧内的一列。例如,我有一个数据框

+----+----+----+----+----+-----------+----------------+
|colA|colB|colC|colD|colE|colPRODRTCE| colCOND|
+----+----+----+----+----+-----------+----------------+
| 1| 1| 1| 1| 3| 39|colA=1 && colB>0|
| 1| 1| 1| 1| 3| 45| colD=1|
| 1| 1| 1| 1| 3| 447|colA>8 && colC=1|
+----+----+----+----+----+-----------+----------------+

在我的新专栏中,我需要评估 colCOND 的表达式是真还是假。

如果你有这样的事情很容易:
  val df = List(
(1,1,1,1,3),
(2,2,3,4,4)
).toDF("colA", "colB", "colC", "colD", "colE")

val myExpression = "colA<colC"

import org.apache.spark.sql.functions.expr

df.withColumn("colRESULT",expr(myExpression)).show()

+----+----+----+----+----+---------+
|colA|colB|colC|colD|colE|colRESULT|
+----+----+----+----+----+---------+
| 1| 1| 1| 1| 3| false|
| 2| 2| 3| 4| 4| true|
+----+----+----+----+----+---------+

但是我必须在每一行中计算一个不同的表达式,它是 在列 colCOND 内。

我想用所有列创建一个 UDF 函数,但我的真实数据框有很多列。我该怎么做?

谢谢大家

最佳答案

如果&&改为AND,可以试试

package spark

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK

object DataFrameLogicWithColumn extends App{
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()

import spark.implicits._

val sourceDF = Seq((1,1,1,1,3,39,"colA=1 AND colB>0"),
(1,1,1,1,3,45,"colD=1"),
(1,1,1,1,3,447,"colA>8 AND colC=1")
).toDF("colA", "colB", "colC", "colD", "colE", "colPRODRTCE", "colCOND").persist(MEMORY_AND_DISK)


val exprs = sourceDF.select('colCOND).distinct().as[String].collect()

val d1 = exprs.map(i => {
val df = sourceDF.filter('colCOND.equalTo(i))
df.withColumn("colRESULT", expr(i))
})

val resultDF = d1.reduce(_ union _)

resultDF.show(false)
// +----+----+----+----+----+-----------+-----------------+---------+
// |colA|colB|colC|colD|colE|colPRODRTCE|colCOND |colRESULT|
// +----+----+----+----+----+-----------+-----------------+---------+
// |1 |1 |1 |1 |3 |39 |colA=1 AND colB>0|true |
// |1 |1 |1 |1 |3 |447 |colA>8 AND colC=1|false |
// |1 |1 |1 |1 |3 |45 |colD=1 |true |
// +----+----+----+----+----+-----------+-----------------+---------+

sourceDF.unpersist()
}
可以试试数据集
    case class c1 (colA: Int, colB: Int, colC: Int, colD: Int, colE: Int, colPRODRTCE: Int, colCOND: String)

case class cRes (colA: Int, colB: Int, colC: Int, colD: Int, colE: Int, colPRODRTCE: Int, colCOND: String, colResult: Boolean)

val sourceData = Seq(c1(1,1,1,1,3,39,"colA=1 AND colB>0"),
c1(1,1,1,1,3,45,"colD=1"),
c1(1,1,1,1,3,447,"colA>8 AND colC=1")
).toDS()

def f2(a: c1): Boolean={
// we need parse value with colCOUND
a.colCOND match {
case "colA=1 AND colB>0" => (a.colA == 1 && a.colB > 0) == true
case _ => false
}
}

val res2 = sourceData
.map(i => cRes(i.colA, i.colB, i.colC, i.colD, i.colE, i.colPRODRTCE, i.colCOND,
f2(i)))

关于scala spark 使用 expr 在列内取值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54647979/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com