gpt4 book ai didi

apache-spark - 使用来自另一个数据框的谓词下推过滤数据框

转载 作者:行者123 更新时间:2023-12-05 07:35:12 24 4
gpt4 key购买 nike

如何根据我拥有的另一个数据帧将过滤器下推到数据帧读取?基本上想避免完全读取第二个数据帧然后进行内部连接。相反,我只想提交一个阅读过滤器以从源头过滤。即使我使用包含读取的内部连接,该计划也不会显示它正在被过滤。我觉得肯定有更好的方法来设置它。使用 Spark 2.x 到目前为止我有这个但我想避免收集如下列表:

//  Don't want to do this collect...too slow
val idFilter = df1.select("id").distinct().map(r => r.getLong(0)).collect.toList
val df2: DataFrame = spark.read.format("parquet").load("<path>")
.filter($"id".isin(idFilter: _*))

最佳答案

除非您自己实现数据源,否则您不能直接使用谓词下推。 Predicate Pushdown 是 Spark Datasources 提供的一种机制,必须由每个 Datasources 单独实现。

对于基于文件的数据源,已经有一个基于磁盘分区的简单机制。

考虑以下 DataFrame:

val df = Seq(("test", "day1"), ("test2", "day2")).toDF("data", "day")

如果我们通过以下方式将该 DataFrame 保存到磁盘:

df.write.partitionBy("day").save("/tmp/data")

结果会是下面的文件夹结构

tmp -
|
| - data - |
|
|--day=day1 -|- part1....parquet
| |- part2....parquet
|
|--day=day2 -|- part1....parquet
|- part2....parquet

如果您现在像这样使用此数据源:

spark.read.load("/tmp/data").filter($"day" = "day1").show()

Spark 甚至不会加载文件夹 day2 的数据,因为不需要它。

这是一种谓词下推,适用于 spark 支持的每种标准文件格式。

一个更具体的机制是 Parquet 。 Parquet 是一种基于列的文件格式,这意味着它很容易过滤掉列。如果您在文件 /tmp/myparquet.parquet 中有包含 3 列 abc 的基于 Parquet 的文件> 以下查询:

spark.read.parquet("/tmp/myparquet.parquet").select("a").show()

将导致内部谓词下推,其中 spark 仅获取列 a 的数据,而不读取列 bc 的数据。

如果有人对通过实现这个特性来建立机制感兴趣:

/**
* A BaseRelation that can eliminate unneeded columns and filter using selected
* predicates before producing an RDD containing all matching tuples as Row objects.
*
* The actual filter should be the conjunction of all `filters`,
* i.e. they should be "and" together.
*
* The pushed down filters are currently purely an optimization as they will all be evaluated
* again. This means it is safe to use them with methods that produce false positives such
* as filtering partitions based on a bloom filter.
*
* @since 1.3.0
*/
@Stable
trait PrunedFilteredScan {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}

可在 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 中找到

关于apache-spark - 使用来自另一个数据框的谓词下推过滤数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49677671/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com