gpt4 book ai didi

apache-spark - 如何将 PySpark 中的数据帧/RDD 以 CSV/Parquet 文件的形式快速保存到磁盘?

转载 作者:行者123 更新时间:2023-12-04 04:21:32 29 4
gpt4 key购买 nike

我有一个 Google Dataproc 集群正在运行,我正在向它提交一个 PySpark 作业,该作业从 Google Cloud Storage(945MB CSV 文件,400 万行 -> 总共需要 48 秒读取)到 PySpark 数据框并应用该数据帧的函数( parsed_dataframe = raw_dataframe.rdd.map(parse_user_agents).toDF() --> 大约需要 4 或 5 秒)。

然后,我必须将这些修改后的结果作为 GZIP 的 CSV 或 Parquet 文件保存回 Google Cloud Storage。我也可以将这些修改后的结果保存在本地,然后将它们复制到 GCS 存储桶中。

我通过 parsed_dataframe = parsed_dataframe.repartition(15) 重新分区数据帧然后尝试通过以下方式保存该新数据帧
parsed_dataframe.write.parquet("gs://somefolder/proto.parquet")parsed_dataframe.write.format("com.databricks.spark.csv").save("gs://somefolder/", header="true")parsed_dataframe.write.format("com.databricks.spark.csv").options(codec="org.apache.hadoop.io.compress.GzipCodec").save("gs://nyt_regi_usage/2017/max_0722/regi_usage/", header="true")
对于 400 万行 (945 MB),这些方法中的每一种(以及它们具有较低/较高分区的不同变体,以及在本地保存与在 GCS 上保存)都需要超过 60 分钟,这是相当长的时间。

我怎样才能优化这个/更快地保存数据?

值得注意的是,Dataproc Cluster 和 GCS bucket 都在同一个 region/zone,并且 Cluster 有一个 n1-highmem-8 (8CPU,52GB 内存)具有 15 个以上工作节点的主节点(只是我仍在测试的变量)

最佳答案

这里有一些危险信号。

1)作为DF读取然后转换为RDD进行处理并单独返回DF是非常低效的。通过恢复到 RDD,您将失去催化剂和钨优化。尝试更改您的函数以在 DF 中工作。

2) 重新分区强制洗牌,但更重要的是,计算现在将仅限于控制 15 个分区的那些执行程序。如果您的执行程序很大(7 核,40 左右 GB RAM),这可能不是问题。

如果在重新分区之前写入输出会发生什么?

请提供更多代码并最好触发 UI 输出以显示作业中的每个步骤需要多长时间。

关于apache-spark - 如何将 PySpark 中的数据帧/RDD 以 CSV/Parquet 文件的形式快速保存到磁盘?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45425786/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com