gpt4 book ai didi

apache-spark - 将 Spark 数据帧写入单个 Parquet 文件

转载 作者:行者123 更新时间:2023-12-03 15:49:39 25 4
gpt4 key购买 nike

我正在尝试做一些非常简单的事情,但我遇到了一些非常愚蠢的斗争。我认为这一定与对 Spark 正在做什么的根本误解有关。我将不胜感激任何帮助或解释。

我有一个非常大的(~3 TB,~300MM 行,25k 分区)表,在 s3 中保存为 Parquet ,我想给某人一个小样本作为单个 Parquet 文件。不幸的是,这需要很长时间才能完成,我不明白为什么。我尝试了以下方法:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.coalesce(1).write.saveAsTable("db.tiny_table")

然后当那不起作用时,我尝试了这个,我认为应该是一样的,但我不确定。 (我添加了 print 以进行调试。)
tiny = spark.table("db.big_table").limit(500).coalesce(1)
print(tiny.count())
print(tiny.show(10))
tiny.write.saveAsTable("db.tiny_table")

当我观看 Yarn UI 时,打印语句和 write正在使用 25k 映射器。 count花了 3 分钟, show花了 25 分钟,然后 write花了大约 40 分钟,尽管它最终确实编写了我正在寻找的单个文件表。

在我看来,第一行应该取前 500 行并将它们合并到一个分区,然后其他行应该发生得非常快(在单个映射器/ reducer 上)。谁能看到我在这里做错了什么?有人告诉我也许我应该使用 sample而不是 limit但据我所知 limit应该快得多。那正确吗?

提前感谢您的任何想法!

最佳答案

我会联系print首先是函数问题,因为它是理解 spark 的基础。然后limit对比 sample .然后repartition对比 coalesce .

原因print函数以这种方式花费这么长时间是因为 coalesce是一个懒惰的转变。 spark 中的大多数转换都是惰性的,直到 才会被评估。行动被调用。

行动是做事的事情,(主要是)不要结果返回一个新的数据帧。赞 count , show .它们返回一个数字和一些数据,而 coalesce返回一个具有 1 个分区的数据帧(有点,见下文)。

发生的事情是您正在重新运行 sql 查询和 coalesce每次在 tiny 上调用操作时调用数据框。这就是他们为每次调用使用 25k 映射器的原因。

为了节省时间,添加 .cache()方法到第一行(无论如何,对于您的 print 代码)。

然后数据帧转换实际上在你的第一行执行,结果保存在你的 Spark 节点的内存中。

这不会对第一行的初始查询时间产生任何影响,但至少您不会再运行该查询 2 次,因为结果已被缓存,然后操作可以使用该缓存结果。

要将其从内存中删除,请使用 .unpersist()方法。

现在对于您尝试执行的实际查询...

这实际上取决于您的数据是如何分区的。如,它是否在特定字段等上进行分区...

您在问题中提到了它,但是 sample可能是正确的方法。

为什么是这样?
limit必须搜索 中的 500 个第一 行。除非您的数据按行号(或某种递增的 id)进行分区,否则前 500 行可以存储在 25k 分区中的任何一个中。

因此,spark 必须搜索所有这些值,直到找到所有正确的值。不仅如此,它还必须执行一个额外的步骤来对数据进行排序以获得正确的顺序。
sample只抓取 500 个随机值。更容易做,因为没有涉及数据的顺序/排序,也不必搜索特定分区的特定行。

虽然 limit可以更快,它也有它的,呃,限制。我通常只将它用于非常小的子集,例如 10/20 行。

现在进行分区....

我认为 coalesce 的问题是吗几乎改变分区。现在我不确定这一点,所以少许盐。

根据pyspark文档:

this operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.



因此,您的 500 行实际上仍将位于 Spark 认为是 1 个虚拟分区的 25k 物理分区上。

使用 .repartition(1).cache() 引起洗牌(通常是坏的)并坚持 Spark 内存在这里可能是个好主意。因为当您 write 时,而不是让 25k 映射器查看物理分区,它应该只会导致 1 个映射器查看 Spark 内存中的内容。然后 write变得容易。您还在处理一个小子集,因此任何改组都应该(希望)是可管理的。

显然,这通常是不好的做法,并且不会改变 spark 在执行原始 sql 查询时可能想要运行 25k 映射器的事实。希望 sample照顾那个。

编辑以澄清改组,repartitioncoalesce

您在 4 节点集群上的 16 个分区中有 2 个数据集。您想加入它们并在 16 个分区中写入一个新数据集。

数据 1 的第 1 行可能在节点 1 上,数据 2 的第 1 行可能在节点 4 上。

为了将这些行连接在一起,spark 必须 body 移动一个或两个,然后写入新分区。

这是一个随机的,围绕集群物理移动数据。

一切都按 16 分区并不重要,重要的是数据在集群上的位置。
data.repartition(4)将数据从每个节点的每 4 组分区物理移动到每个节点的 1 个分区。

Spark 可能会将所有 4 个分区从节点 1 移动到其他 3 个节点,在这些节点上的新单个分区中,反之亦然。

我不认为它会这样做,但这是一个证明这一点的极端案例。

一个 coalesce(4)调用虽然不移动数据,但它更聪明。相反,它识别“我已经有每个节点 4 个分区和总共 4 个节点......我只是将每个节点的所有 4 个分区称为单个分区,然后我将有 4 个分区!”

所以它不需要移动任何数据,因为它只是将现有的分区组合成一个连接的分区。

关于apache-spark - 将 Spark 数据帧写入单个 Parquet 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52206576/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com