gpt4 book ai didi

apache-spark - 如何在不使用合并的情况下在本地系统的单个文件中写入 Spark 数据帧

转载 作者:行者123 更新时间:2023-12-04 09:00:40 24 4
gpt4 key购买 nike

我想从 pyspark 数据帧生成一个 avro 文件,目前我正在做 coalesce如下

df = df.coalesce(1)
df.write.format('avro').save('file:///mypath')
但这会导致内存问题,因为所有数据都将在写入之前被提取到内存中,而且我的数据大小每天都在持续增长。所以我想按每个分区写入数据,以便数据以块的形式写入磁盘并且不会引发 OOM 问题。我发现 toLocalIterator有助于实现这一目标。但我不确定如何使用它。我尝试了以下用法并返回所有行
iter = df.toLocalIterator()
for i in iter:
print('writing some data')
# write the data into disk/file
迭代器迭代每一行而不是每个分区。我该怎么做?

最佳答案

当您这样做时df = df.coalesce(1)所有数据都收集到其中一个工作节点中。如果该节点由于节点上的资源限制而无法处理如此庞大的任务,则作业将因 OOM 错误而失败。
根据 Spark 文档 toLocalIterator 返回包含当前数据集中所有行的迭代器 可以消耗的最大内存相当于这个数据集中最大的分区
toLocalIterator 如何工作?
第一个分区被发送到驱动程序。如果继续迭代并到达第一个分区的末尾,第二个分区将被发送到驱动程序节点,依此类推,直到最后一个分区..这就是为什么(它可以占用的最大内存=最大分区)
确保您的主节点有足够的内存和磁盘。
如果前一个分区处理完成,toLocalIterator.next() 方法确保拉下一个分区记录。

what you can do is 

//batch objects like 1000 per batch
df.toLocalIterator().foreach(obj => {
//add object in array
//if batch size is reached ...
//then serialize them and use FileOutputStream and save in local location
})
注意:确保缓存您的 parentDF ..否则在某些情况下,每个分区都需要重新计算。

关于apache-spark - 如何在不使用合并的情况下在本地系统的单个文件中写入 Spark 数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63574983/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com