gpt4 book ai didi

scala - Spark DataFrame 过滤 : retain element belonging to a list

转载 作者:行者123 更新时间:2023-12-04 23:18:32 25 4
gpt4 key购买 nike

我在 Zeppelin 笔记本上使用 Spark 1.5.1 和 Scala。

  • 我有一个 DataFrame,其中有一列名为 userID 的 Long 类型。
  • 我总共有大约 400 万行和 200,000 个唯一用户 ID。
  • 我还有一个要排除的 50,000 个用户 ID 的列表。
  • 我可以轻松构建要保留的用户 ID 列表。

  • 删除属于要排除的用户的所有行的最佳方法是什么?

    提出相同问题的另一种方法是:保留属于用户的行的最佳方法是什么?

    我看到了 this post并应用了它的解决方案(参见下面的代码),但执行速度很慢,因为我知道我在本地机器上运行 SPARK 1.5.1,我有 16GB 的 RAM 内存,初始 DataFrame 适合内存。

    这是我正在申请的代码:
    import org.apache.spark.sql.functions.lit
    val finalDataFrame = initialDataFrame.where($"userID".in(listOfUsersToKeep.map(lit(_)):_*))

    在上面的代码中:
  • initialDataFrame 有 3885068 行,每行有 5 列,其中一列称为 userID,它包含 Long 值。
  • listOfUsersToKeep 是一个 Array[Long],它包含 150,000 个 Long 用户 ID。

  • 我想知道是否有比我正在使用的解决方案更有效的解决方案。

    谢谢

    最佳答案

    您可以使用 join :

    val usersToKeep = sc.parallelize(
    listOfUsersToKeep.map(Tuple1(_))).toDF("userID_")

    val finalDataFrame = usersToKeep
    .join(initialDataFrame, $"userID" === $"userID_")
    .drop("userID_")

    或广播变量和 UDF:
    import org.apache.spark.sql.functions.udf

    val usersToKeepBD = sc.broadcast(listOfUsersToKeep.toSet)
    val checkUser = udf((id: Long) => usersToKeepBD.value.contains(id))
    val finalDataFrame = initialDataFrame.where(checkUser($"userID"))

    也应该可以广播一个 DataFrame:
    import org.apache.spark.sql.functions.broadcast

    initialDataFrame.join(broadcast(usersToKeep), $"userID" === $"userID_")

    关于scala - Spark DataFrame 过滤 : retain element belonging to a list,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33824933/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com