gpt4 book ai didi

apache-spark - 优化从 s3 存储桶中的分区 Parquet 文件读取

转载 作者:行者123 更新时间:2023-12-04 15:17:23 57 4
gpt4 key购买 nike

我有一个 parquet 格式的大型数据集(大小约 1TB),分为 2 个层次结构:CLASSDATE只有7个类(class)。但是日期从 2020-01-01 开始不断增加。我的数据首先按 CLASS 分区,然后按 DATE

所以像这样:

CLASS1---DATE 1
---DATE 2
--- .
--- .
--- .
---DATE N

CLASS2---DATE 1
---DATE 2
--- .
--- .
--- .
---DATE N

我在 for 循环中按 CLASS 加载我的数据。如果我加载整个 parquet 文件,YARN 会终止作业,因为它会使内存实例过载。但是自从我在建模中进行百分位数计算以来,我一直在加载。此方法大约需要 23 小时才能完成。

但是,如果我重新分区,使我只有 CLASS 分区,则工作大约需要 10 个小时。有太多的子分区会减慢 Spark 执行器的工作吗?我将分区层次结构保持为 CLASS -> DATE 只是因为我需要每天在 DATE 之前追加新数据。如果只有 1 个分区效率更高,那么我必须每天在加载新数据后重新分区到 CLASS 分区。有人可以解释为什么单个分区工作得更快吗?如果是这样,通过附加而不重新分区整个数据集来每天对数据进行分区的最佳方法是什么?

谢谢

编辑:我在文件结构上使用 for 循环按 CLASS 分区循环,如下所示:

fs = s3fs.S3FileSystem(anon=False)    
inpath="s3://bucket/file.parquet/"

Dirs= fs.ls(inpath)
for paths in Dirs:
customPath='s3://' + uvapath + '/'
class=uvapath.split('=')[1]
df=spark.read.parquet(customPath)
outpath="s3://bucket/Output_" + class + ".parquet"
#Perform calculations
df.write.mode('overwrite').parquet(outpath)

加载的 df 将包含 CLASS=1 的所有日期。然后我将文件输出为每个 CLASS 的单独 Parquet 文件,这样我就有 7 个 Parquet 文件:

Output_1.parquet
Output_2.parquet
Output_3.parquet
Output_4.parquet
Output_5.parquet
Output_6.parquet
Output_7.parquet

然后我将 7 个 Parquet 合并成一个 Parquet ,这不是问题,因为生成的 Parquet 文件要小得多。

最佳答案

我有包含年、月和 ID 三列的分区数据。文件夹路径层次为

year=2020/month=08/id=1/*.parquet
year=2020/month=08/id=2/*.parquet
year=2020/month=08/id=3/*.parquet
...
year=2020/month=09/id=1/*.parquet
year=2020/month=09/id=2/*.parquet
year=2020/month=09/id=3/*.parquet

我可以通过加载根路径来读取 DataFrame。

val df = spark.read.parquet("s3://mybucket/")

然后,分区列会自动添加到 DataFrame 中。现在,您可以按照以下方式过滤分区列的数据

val df_filtered = df.filter("year = '2020' and month = '09'")

并使用 df_filtered 做一些事情,然后 spark 将只使用分区数据!


对于你的重复处理,你可以使用spark的fair scheduler。使用以下代码将 fair.xml 文件添加到项目的 src/main/resources 中,

<?xml version="1.0"?>

<allocations>
<pool name="fair">
<schedulingMode>FAIR</schedulingMode>
<weight>10</weight>
<minShare>0</minShare>
</pool>
</allocations>

并在创建 spark session 后设置 spark 配置。

spark.sparkContext.setLocalProperty("spark.scheduler.mode", "FAIR")
spark.sparkContext.setLocalProperty("spark.scheduler.allocation.file", getClass.getResource("/fair.xml").getPath)
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "fair")

然后您可以并行完成您的工作。您可能希望根据类并行化作业,所以

val classes = (1 to 7).par
val date = '2020-09-25'

classes foreach { case i =>

val df_filtered = df.filter(s"CLASS == '$i' and DATE = '$date'")

// Do your job

}

代码将同时使用不同的 CLASS 值。

关于apache-spark - 优化从 s3 存储桶中的分区 Parquet 文件读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64064587/

57 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com