gpt4 book ai didi

apache-spark - pyspark:有效地让partitionBy写入与原始表相同数量的总分区

转载 作者:行者123 更新时间:2023-12-03 11:27:57 26 4
gpt4 key购买 nike

我有一个与 pyspark 的 repartitionBy() 相关的问题。我最初在 this question 的评论中发布的功能.我被要求将它作为一个单独的问题发布,所以这里是:

我了解 df.partitionBy(COL)将写入每个值为 COL 的所有行到他们自己的文件夹,并且每个文件夹将(假设行之前通过其他键分布在所有分区中)具有与以前在整个表中大致相同数量的文件。我觉得这种行为很烦人。如果我有一个有 500 个分区的大表,我使用 partitionBy(COL)在某些属性列上,我现在有例如 100 个文件夹,每个文件夹包含 500 个(现在非常小)文件。

我想要的是partitionBy(COL)行为,但文件大小和文件数量与我最初的大致相同。

作为演示,上一个问题分享了一个玩具示例,其中您有一个包含 10 个分区的表并执行 partitionBy(dayOfWeek)现在你有 70 个文件,因为每个文件夹中有 10 个。我想要大约 10 个文件,每天一个,对于有更多数据的日子可能需要 2 或 3 个。

这很容易实现吗?像 df.write().repartition(COL).partitionBy(COL)看起来它可能会起作用,但我担心(在即将被划分为许多文件夹的非常大的表的情况下)必须先将它组合到一些少量的分区中,然后再执行 partitionBy(COL)似乎是个坏主意。

任何建议都非常感谢!

最佳答案

你有几个选择。在我下面的代码中,我假设你想用 Parquet 写,但当然你可以改变它。

(1) df.repartition(numPartitions, *cols).write.partitionBy(*cols).parquet(writePath)

这将首先使用基于散列的分区来确保来自 COL 的有限数量的值进入每个分区。取决于您为 numPartitions 选择的值, 一些分区可能是空的,而另一些可能会挤满值——对于不知道为什么的人,请阅读 this .然后,当您调用 partitionBy在 DataFrameWriter 上,每个分区中的每个唯一值都将放置在其自己的单独文件中。

警告:这种方法可能导致不平衡的分区大小和不平衡的任务执行时间。当您的列中的值与许多行相关联时会发生这种情况(例如,城市列 - 纽约市的文件可能有很多行),而其他值较少(例如,小城镇的值)。

(2) df.sort(sortCols).write.parquet(writePath)

当您希望 (1) 您写入的文件大小几乎相等 (2) 精确控制写入的文件数量时,此选项非常有用。这种方法首先对您的数据进行全局排序,然后找到将数据分解为 k 的拆分。大小均匀的分区,其中 k在 spark 配置中指定 spark.sql.shuffle.partitions .这意味着具有相同排序键值的所有值彼此相邻,但有时它们会跨越一个拆分,并位于不同的文件中。如果您的用例要求具有相同键的所有行位于同一分区中,则不要使用这种方法。

还有两个额外的好处:(1)通过对数据进行排序,它在磁盘上的大小通常可以减少(例如,按 user_id 然后按时间对所有事件进行排序会导致列值出现大量重复,这有助于压缩)和(2 ) 如果您写入支持它的文件格式(如 Parquet),则后续读取器可以通过使用谓词下推以最佳方式读取数据,因为 Parquet 写入器将写入元数据中每列的 MAX 和 MIN 值,从而允许如果查询指定的值超出分区的 (min, max) 范围,则 reader 跳过行。

请注意,Spark 中的排序比重新分区更昂贵,并且需要一个额外的阶段。在幕后 Spark 将首先确定一个阶段的拆分,然后将数据洗牌到另一个阶段的这些拆分中。

(3) df.rdd.partitionBy(customPartitioner).toDF().write.parquet(writePath)

如果您在 Scala 上使用 spark,那么您可以编写一个客户分区器,它可以克服基于散列的分区器的恼人问题。不幸的是,这在 pySpark 中不是一个选项。如果您真的想在 pySpark 中编写自定义分区程序,我发现使用 rdd.repartitionAndSortWithinPartitions 是可能的,尽管有点尴尬。 :

df.rdd \
.keyBy(sort_key_function) \ # Convert to key-value pairs
.repartitionAndSortWithinPartitions(numPartitions=N_WRITE_PARTITIONS,
partitionFunc=part_func) \
.values() # get rid of keys \
.toDF().write.parquet(writePath)

也许其他人知道在 pyspark 的数据帧上使用自定义分区器的更简单方法?

关于apache-spark - pyspark:有效地让partitionBy写入与原始表相同数量的总分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50775870/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com