gpt4 book ai didi

python - Pyspark Out of Memory 窗口函数

转载 作者:太空宇宙 更新时间:2023-11-04 02:33:16 25 4
gpt4 key购买 nike

我发现我编写的 pyspark 脚本存在一些可扩展性问题,想知道是否有人能够阐明一些问题。

我有一个与此处介绍的用例非常相似的用例:

Separate multi line record with start and end delimiter

因为我有一些多行数据,记录之间有一个逻辑分隔符。例如。数据看起来像:

AA123
BB123
CCXYZ
AA321
BB321
CCZYX
...

使用上一个答案中的示例,我使用类似...的脚本将其分成多条记录

spark = SparkSession \
.builder \
.appName("TimetableSession") \
#Played around with setting the available memory at runtime
.config("spark.executor.memory", "8g") \
.config("spark.driver.memory", "8g") \
.getOrCreate()

files = os.path.join("data","*_lots_of_gzipped_files.gz")
df=spark.sparkContext.textFile(files).toDF()
df=df.withColumn("id", monotonically_increasing_id())

w=Window.partitionBy().orderBy('id')
df=df.withColumn('AA_indicator', expr("case when entry like 'AA%' then 1 else 0 end"))
#!!!Blowing up with OOM errors here at scale!!!
df=df.withColumn('index', sum('AA_indicator').over(w))
df.show()

+--------------------+---+------------+-----+
| entry| id|AA_indicator|index|
+--------------------+---+------------+-----+
| AA123| 1| 1| 1|
| BB123| 2| 0| 1|
| CCXYZ| 3| 0| 1|
| AA321| 4| 1| 2|
| BB321| 5| 0| 2|
| CCZYX| 6| 0| 2|
+--------------------+---+------------+-----+

这似乎适用于合理大小的数据(例如 50MB 的数据),当我将其扩展到 > 1GB 的数据时,我看到了 Java OOM 错误。即使在尝试为 spark.driver/executor 分配 > 20GB 内存时,我也遇到了同样的问题。

我认为问题在于数据分区的窗口和所有内容都立即收集到内存中而不是并行化?但我可能离题太远了。

我正在使用 jupyter pyspark notebook 在独立的 docker 容器中运行此脚本 https://github.com/jupyter/docker-stacks/tree/master/pyspark-notebook .

在索引“记录”的更好方法或如何更好地解决问题方面的任何帮助将不胜感激。

最佳答案

可能是因为您使用的窗口没有PARTITION BY:

Window.partitionBy().orderBy('id')

在这种情况下,Spark 不会分发数据并按顺序在一台机器上处理所有记录。

有很多 gzip 压缩 文件会使情况变得更糟,因为 gzip 压缩无法拆分。所以每个文件都在一台机器上加载,也可以OOM。

总的来说,这对 Spark 没有好处。

关于python - Pyspark Out of Memory 窗口函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48565965/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com