gpt4 book ai didi

scala - 将列和映射传递给 Scala UDF

转载 作者:行者123 更新时间:2023-12-04 10:28:34 27 4
gpt4 key购买 nike

我来自 Pyspark。我知道如何在 Pyspark 中做到这一点,但还没有在 Scala 中做到同样的事情。

这是一个数据框,

val df = Seq(
("u1", Array[Int](2,3,4)),
("u2", Array[Int](7,8,9))
).toDF("id", "mylist")


// +---+---------+
// | id| mylist|
// +---+---------+
// | u1|[2, 3, 4]|
// | u2|[7, 8, 9]|
// +---+---------+

这是一个 Map 对象,
val myMap = (1 to 4).toList.map(x => (x,0)).toMap

//myMap: scala.collection.immutable.Map[Int,Int] = Map(1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0)

所以这张 map 的键值从 1 到 4。

对于 df 的每一行,我想检查“mylist”中的任何元素是否作为键值包含在 myMap 中。如果 myMap 包含一个元素,则返回该元素(如果包含多个元素,则返回任何一个),否则返回 -1。

所以结果应该是这样的
    +---+---------+-------+
| id| mylist| label|
+---+---------+-------+
| u1|[2, 3, 4]| 2 |
| u2|[7, 8, 9]| -1 |
+---+---------+-------+

我尝试了以下方法:
  • 下面的函数适用于数组对象,但不适用于列:
  • def list2label(ls: Array[Int],
    m: Map[Int, Int]):(Int) = {
    var flag = 0
    for (element <- ls) {
    if (m.contains(element)) flag = element
    }
    flag
    }

    val testls = Array[Int](2,3,4)
    list2label(testls, myMap)

    //testls: Array[Int] = Array(2, 3, 4)
    //res33: Int = 4
  • 尝试使用 UDF,但出现错误:
  • def list2label_udf(m: Map[Int, Int]) = udf( (ls: Array[Int]) =>(

    var flag = 0
    for (element <- ls) {
    if (m.contains(element)) flag = element
    }
    flag
    )
    )

    //<console>:3: error: illegal start of simple expression
    // var flag = 0
    // ^

    我认为我的 udf 格式错误..
  • 在 Pyspark 中,我可以随心所欲地执行此操作:
  • %pyspark

    myDict={1:0, 2:0, 3:0, 4:0}

    def list2label(ls, myDict):
    for i in ls:
    if i in dict3:
    return i
    return 0

    def list2label_UDF(myDict):
    return udf(lambda c: list2label(c,myDict))

    df = df.withColumn("label",list2label_UDF(myDict)(col("mylist")))

    任何帮助,将不胜感激!

    最佳答案

    解决方法如下图:

      scala> df.show
    +---+---------+
    | id| mylist|
    +---+---------+
    | u1|[2, 3, 4]|
    | u2|[7, 8, 9]|
    +---+---------+


    scala> def customUdf(m: Map[Int,Int]) = udf((s: Seq[Int]) => {
    val intersection = s.toList.intersect(m.keys.toList)
    if(intersection.isEmpty) -1 else intersection(0)})

    customUdf: (m: Map[Int,Int])org.apache.spark.sql.expressions.UserDefinedFunction

    scala> df.select($"id", $"myList", customUdf(myMap)($"myList").as("new_col")).show
    +---+---------+-------+
    | id| myList|new_col|
    +---+---------+-------+
    | u1|[2, 3, 4]| 2|
    | u2|[7, 8, 9]| -1|
    +---+---------+-------+

    另一种方法可能是发送映射的键列表而不是映射本身,因为 ypu 只检查键。为此,解决方案如下:
    scala> def customUdf1(m: List[Int]) = udf((s: Seq[Int]) => {
    val intersection = s.toList.intersect(m)
    if(intersection.isEmpty) -1 else intersection(0)})

    customUdf1: (m: List[Int])org.apache.spark.sql.expressions.UserDefinedFunction

    scala> df.select($"id",$"myList", customUdf1(myMap.keys.toList)($"myList").as("new_col")).show
    +---+---------+-------+
    | id| myList|new_col|
    +---+---------+-------+
    | u1|[2, 3, 4]| 2|
    | u2|[7, 8, 9]| -1|
    +---+---------+-------+

    让我知道它是否有帮助!!

    关于scala - 将列和映射传递给 Scala UDF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60525694/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com