- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我已经搜索了很多简洁的答案,希望有人可以帮助我澄清数据 block 分区..
假设我有一个包含列的数据框:Year
, Month
, Day
, SalesAmount
, StoreNumber
我想按年和月进行分区存储..这样我就可以运行以下命令:
df.write.partitionBy('Year', 'Month').format('csv').save('/mnt/path/', header='true')
这将以以下格式输出数据:/path/Year=2019/Month=05/<file-0000x>.csv
如果我然后再次加载它,例如:
spark.read.format('csv').options(header='true').load('/mnt/path/').createOrReplaceTempView("temp1")
Q1:这还没有真正“读取”数据,对吗?即我可能有数十亿条记录..但直到我实际查询 temp1
,没有针对源执行任何操作?
Q2-A:随后,当使用 temp1
查询此数据时,我的假设是,如果我在 where 子句中包含分区中使用的项目,则会对从磁盘读取的实际文件应用智能过滤?
%sql
select * from temp1 where Year = 2019 and Month = 05 -- OPTIMAL
而以下内容不会执行任何文件过滤,因为它没有要查找的分区的上下文:
%sql
select * from temp1 where StoreNum = 152 and SalesAmount > 10000 -- SUB-OPTIMAL
Q2-B:最后,如果我以 Parquet 格式(而不是 *.csv)存储文件..上面的两个查询都会“下推”到存储的实际数据中。 .但也许以不同的方式?
即第一个仍会使用分区,但第二个 ( where StoreNum = 152 and SalesAmount > 10000
) 现在将使用 parquet 的列式存储?而 *.csv 没有这种优化?
任何人都可以澄清我对此的想法/理解吗?
资源链接也很棒..
最佳答案
A1:您对 createOrReplaceTempView
的评估是正确的。这将为当前 Spark session 延迟进行评估。换句话说,如果您终止 Spark session 而不访问它,则数据将永远不会传输到 temp1 中。
A2:让我们通过使用您的代码的示例来检查该案例。首先让我们保存您的数据:
df.write.mode("overwrite").option("header", "true")
.partitionBy("Year", "Month")
.format("csv")
.save("/tmp/partition_test1/")
然后加载它:
val df1 = spark.read.option("header", "true")
.csv("/tmp/partition_test1/")
.where($"Year" === 2019 && $"Month" === 5)
执行df1.explain
将返回:
== Physical Plan ==
*(1) FileScan csv [Day#328,SalesAmount#329,StoreNumber#330,Year#331,Month#332] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/partition_test1], PartitionCount: 0, Partition
Filters: [isnotnull(Year#331), isnotnull(Month#332), (Year#331 = 2019), (Month#332 = 5)], PushedFilters: [], ReadSchema: struct<Day:string,SalesAmount:string,StoreNumber:string>
正如您所看到的,PushedFilters: []
数组为空,尽管 PartitionFilters[]
不是空的,这表明 Spark 能够对分区应用过滤,从而进行修剪不满足 where
语句的分区。
如果我们将 Spark 查询稍微更改为:
df1.where($"StoreNumber" === 1 && $"Year" === 2011 && $"Month" === 11).explain
== Physical Plan ==
*(1) Project [Day#462, SalesAmount#463, StoreNumber#464, Year#465, Month#466]
+- *(1) Filter (isnotnull(StoreNumber#464) && (cast(StoreNumber#464 as int) = 1))
+- *(1) FileScan csv [Day#462,SalesAmount#463,StoreNumber#464,Year#465,Month#466] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/partition_test1], PartitionCount: 1, Par
titionFilters: [isnotnull(Month#466), isnotnull(Year#465), (Year#465 = 2011), (Month#466 = 11)], PushedFilters: [IsNotNull(StoreNumber)], ReadSchema: struct<Day:string,SalesAmount:string,Store
Number:string>
现在,PartitionFilters
和 PushedFilters
都将最大限度地减少 Spark 工作负载。正如您所看到的,Spark 首先通过 PartitionFilters 识别现有分区,然后应用谓词下推,从而利用这两个过滤器。
完全相同的情况也适用于 parquet
文件,最大的区别是 parquet 将利用谓词下推过滤器,甚至更多地将它们与其内部基于柱状的系统结合起来(正如您已经提到的),该系统保留指标和对数据进行统计。因此,与 CSV 文件的区别在于,对于 CSV,当 Spark 读取/扫描 CSV 文件(排除不满足谓词下推条件的记录)时,将发生谓词下推。对于 parquet,谓词下推过滤器将传播到 parquet 内部系统,从而导致更大的数据修剪。
在您的情况下,从 createOrReplaceTempView
加载数据不会有所不同,并且执行计划将保持不变。
一些有用的链接:
https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
https://www.waitingforcode.com/apache-spark-sql/predicate-pushdown-spark-sql/read
关于pyspark - 与谓词下推相关的数据 block 分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56031691/
我在数据框中有一列月份数字,想将其更改为月份名称,所以我使用了这个: df['monthName'] = df['monthNumber'].apply(lambda x: calendar.mont
Pyspark 中是否有一个 input() 函数,我可以通过它获取控制台输入。如果是,请详细说明一下。 如何在 PySpark 中编写以下代码: directory_change = input("
我们正在 pyspark 中构建数据摄取框架,并想知道处理数据类型异常的最佳方法是什么。基本上,我们希望有一个拒绝表来捕获所有未与架构确认的数据。 stringDf = sparkSession.cr
我正在开发基于一组 ORC 文件的 spark 数据框的 sql 查询。程序是这样的: from pyspark.sql import SparkSession spark_session = Spa
我有一个 Pyspark 数据框( 原始数据框 )具有以下数据(所有列都有 字符串 数据类型): id Value 1 103 2
我有一台配置了Redis和Maven的服务器 然后我执行以下sparkSession spark = pyspark .sql .SparkSession .builder .master('loca
从一些简短的测试来看,pyspark 数据帧的列删除功能似乎不区分大小写,例如。 from pyspark.sql import SparkSession from pyspark.sql.funct
我有: +---+-------+-------+ | id| var1| var2| +---+-------+-------+ | a|[1,2,3]|[1,2,3]| | b|[2,
从一些简短的测试来看,pyspark 数据帧的列删除功能似乎不区分大小写,例如。 from pyspark.sql import SparkSession from pyspark.sql.funct
我有一个带有多个数字列的 pyspark DF,我想为每一列根据每个变量计算该行的十分位数或其他分位数等级。 这对 Pandas 来说很简单,因为我们可以使用 qcut 函数为每个变量创建一个新列,如
我有以下使用 pyspark.ml 包进行线性回归的代码。但是,当模型适合时,我在最后一行收到此错误消息: IllegalArgumentException: u'requirement failed
我有一个由 | 分隔的平面文件(管道),没有引号字符。示例数据如下所示: SOME_NUMBER|SOME_MULTILINE_STRING|SOME_STRING 23|multiline text
给定如下模式: root |-- first_name: string |-- last_name: string |-- degrees: array | |-- element: struc
我有一个 pyspark 数据框如下(这只是一个简化的例子,我的实际数据框有数百列): col1,col2,......,col_with_fix_header 1,2,.......,3 4,5,.
我有一个数据框 +------+--------------------+-----------------+---- | id| titulo |tipo | formac
我从 Spark 数组“df_spark”开始: from pyspark.sql import SparkSession import pandas as pd import numpy as np
如何根据行号/行索引值删除 Pyspark 中的行值? 我是 Pyspark(和编码)的新手——我尝试编码一些东西,但它不起作用。 最佳答案 您不能删除特定的列,但您可以使用 filter 或其别名
我有一个循环生成多个因子表的输出并将列名存储在列表中: | id | f_1a | f_2a | |:---|:----:|:-----| |1 |1.2 |0.95 | |2 |0.7
我正在尝试将 hql 脚本转换为 pyspark。我正在努力如何在 groupby 子句之后的聚合中实现 case when 语句的总和。例如。 dataframe1 = dataframe0.gro
我想添加新的 2 列值服务 arr 第一个和第二个值 但我收到错误: Field name should be String Literal, but it's 0; production_targe
我是一名优秀的程序员,十分优秀!