gpt4 book ai didi

pyspark - 与谓词下推相关的数据 block 分区

转载 作者:行者123 更新时间:2023-12-02 14:27:52 24 4
gpt4 key购买 nike

我已经搜索了很多简洁的答案,希望有人可以帮助我澄清数据 block 分区..

假设我有一个包含列的数据框:Year , Month , Day , SalesAmount , StoreNumber

我想按年和月进行分区存储..这样我就可以运行以下命令:

df.write.partitionBy('Year', 'Month').format('csv').save('/mnt/path/', header='true')

这将以以下格式输出数据:/path/Year=2019/Month=05/<file-0000x>.csv

如果我然后再次加载它,例如:

spark.read.format('csv').options(header='true').load('/mnt/path/').createOrReplaceTempView("temp1")

Q1:这还没有真正“读取”数据,对吗?即我可能有数十亿条记录..但直到我实际查询 temp1 ,没有针对源执行任何操作?

Q2-A:随后,当使用 temp1 查询此数据时,我的假设是,如果我在 where 子句中包含分区中使用的项目,则会对从磁盘读取的实际文件应用智能过滤?

%sql
select * from temp1 where Year = 2019 and Month = 05 -- OPTIMAL

而以下内容不会执行任何文件过滤,因为它没有要查找的分区的上下文:

%sql
select * from temp1 where StoreNum = 152 and SalesAmount > 10000 -- SUB-OPTIMAL

Q2-B:最后,如果我以 Parquet 格式(而不是 *.csv)存储文件..上面的两个查询都会“下推”到存储的实际数据中。 .但也许以不同的方式?

即第一个仍会使用分区,但第二个 ( where StoreNum = 152 and SalesAmount > 10000 ) 现在将使用 parquet 的列式存储?而 *.csv 没有这种优化?

任何人都可以澄清我对此的想法/理解吗?

资源链接也很棒..

最佳答案

A1:您对 createOrReplaceTempView 的评估是正确的。这将为当前 Spark session 延迟进行评估。换句话说,如果您终止 Spark session 而不访问它,则数据将永远不会传输到 temp1 中。

A2:让我们通过使用您的代码的示例来检查该案例。首先让我们保存您的数据:

df.write.mode("overwrite").option("header", "true")
.partitionBy("Year", "Month")
.format("csv")
.save("/tmp/partition_test1/")

然后加载它:

val df1 = spark.read.option("header", "true")
.csv("/tmp/partition_test1/")
.where($"Year" === 2019 && $"Month" === 5)

执行df1.explain将返回:

== Physical Plan ==
*(1) FileScan csv [Day#328,SalesAmount#329,StoreNumber#330,Year#331,Month#332] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/partition_test1], PartitionCount: 0, Partition
Filters: [isnotnull(Year#331), isnotnull(Month#332), (Year#331 = 2019), (Month#332 = 5)], PushedFilters: [], ReadSchema: struct<Day:string,SalesAmount:string,StoreNumber:string>

正如您所看到的,PushedFilters: [] 数组为空,尽管 PartitionFilters[] 不是空的,这表明 Spark 能够对分区应用过滤,从而进行修剪不满足 where 语句的分区。

如果我们将 Spark 查询稍微更改为:

df1.where($"StoreNumber" === 1 && $"Year" === 2011 && $"Month" === 11).explain

== Physical Plan ==
*(1) Project [Day#462, SalesAmount#463, StoreNumber#464, Year#465, Month#466]
+- *(1) Filter (isnotnull(StoreNumber#464) && (cast(StoreNumber#464 as int) = 1))
+- *(1) FileScan csv [Day#462,SalesAmount#463,StoreNumber#464,Year#465,Month#466] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/partition_test1], PartitionCount: 1, Par
titionFilters: [isnotnull(Month#466), isnotnull(Year#465), (Year#465 = 2011), (Month#466 = 11)], PushedFilters: [IsNotNull(StoreNumber)], ReadSchema: struct<Day:string,SalesAmount:string,Store
Number:string>

现在,PartitionFiltersPushedFilters 都将最大限度地减少 Spark 工作负载。正如您所看到的,Spark 首先通过 PartitionFilters 识别现有分区,然后应用谓词下推,从而利用这两个过滤器。

完全相同的情况也适用于 parquet 文件,最大的区别是 parquet 将利用谓词下推过滤器,甚至更多地将它们与其内部基于柱状的系统结合起来(正如您已经提到的),该系统保留指标和对数据进行统计。因此,与 CSV 文件的区别在于,对于 CSV,当 Spark 读取/扫描 CSV 文件(排除不满足谓词下推条件的记录)时,将发生谓词下推。对于 parquet,谓词下推过滤器将传播到 parquet 内部系统,从而导致更大的数据修剪。

在您的情况下,从 createOrReplaceTempView 加载数据不会有所不同,并且执行计划将保持不变。

一些有用的链接:

https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

https://www.waitingforcode.com/apache-spark-sql/predicate-pushdown-spark-sql/read

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkStrategy-FileSourceStrategy.html

关于pyspark - 与谓词下推相关的数据 block 分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56031691/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com