gpt4 book ai didi

apache-spark - 过滤 DataFrame 最有效的方法是什么

转载 作者:行者123 更新时间:2023-12-03 07:27:08 26 4
gpt4 key购买 nike

...通过检查列的值是否在 seq 中。
也许我没有很好地解释它,我基本上想要这个(使用常规 SQL 表达它): DF_Column IN seq?

首先,我使用broadcast var(我放置序列的位置)、UDF(进行检查)和registerTempTable来完成此操作。
问题是我没有测试它,因为我遇到了 known bug显然,仅当将 registerTempTableScalaIDE 一起使用时才会出现。

我最终从 seq 中创建了一个新的 DataFrame 并与其进行内部连接(交集),但我怀疑这是完成任务的最有效的方法。

谢谢

编辑:(回复@YijieShen):
如何根据一个 DataFrame 列的元素是否在另一个 DF 的列中进行过滤(例如 SQL select * from A where login in (select username from B))?

例如:第一个 DF:

login      count
login1 192
login2 146
login3 72

第二个 DF:

username
login2
login3
login4

结果:

login      count
login2 146
login3 72

尝试:
EDIT-2:我认为,既然错误已修复,这些应该可以工作。 END EDIT-2

ordered.select("login").filter($"login".contains(empLogins("username")))

ordered.select("login").filter($"login" in empLogins("username"))

它们分别在线程“main”org.apache.spark.sql.AnalysisException中抛出异常:

resolved attribute(s) username#10 missing from login#8 in operator 
!Filter Contains(login#8, username#10);

resolved attribute(s) username#10 missing from login#8 in operator 
!Filter login#8 IN (username#10);

最佳答案

我的代码(遵循第一个方法的描述)在这两种配置的 Spark 1.4.0-SNAPSHOT 中正常运行:

  • Intellij IDEA的测试
  • Spark 独立集群,具有 8 个节点(1 个主节点,7 个工作节点)

请检查是否存在任何差异

val bc = sc.broadcast(Array[String]("login3", "login4"))
val x = Array(("login1", 192), ("login2", 146), ("login3", 72))
val xdf = sqlContext.createDataFrame(x).toDF("name", "cnt")

val func: (String => Boolean) = (arg: String) => bc.value.contains(arg)
val sqlfunc = udf(func)
val filtered = xdf.filter(sqlfunc(col("name")))

xdf.show()
filtered.show()

输出

name cnt
login1 192
login2 146
login3 72

name cnt
login3 72

关于apache-spark - 过滤 DataFrame 最有效的方法是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29796928/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com