gpt4 book ai didi

java - 从显示 OutOfMemoryError : Java heap space 的大型 Pyspark 数据帧创建字典

转载 作者:行者123 更新时间:2023-12-01 14:31:44 25 4
gpt4 key购买 nike

我见过并尝试过很多existing StackOverflow 发布了有关此问题的帖子,但没有任何效果。我猜我的 JAVA 堆空间没有我的大型数据集预期的那么大,我的数据集包含 6.5M 行。我的 Linux 实例包含 64GB 内存和 4 个内核。根据这个suggestion我需要修复我的代码,但我认为从 pyspark 数据框制作字典应该不会很昂贵。如果有任何其他计算方法,请告诉我。

我只想从我的 pyspark 数据框制作一个 python 字典,这是我的 pyspark 数据框的内容,

property_sql_df.show() 显示,

+--------------+------------+--------------------+--------------------+
| id|country_code| name| hash_of_cc_pn_li|
+--------------+------------+--------------------+--------------------+
| BOND-9129450| US|Scotron Home w/Ga...|90cb0946cf4139e12...|
| BOND-1742850| US|Sited in the Mead...|d5c301f00e9966483...|
| BOND-3211356| US|NEW LISTING - Com...|811fa26e240d726ec...|
| BOND-7630290| US|EC277- 9 Bedroom ...|d5c301f00e9966483...|
| BOND-7175508| US|East Hampton Retr...|90cb0946cf4139e12...|
+--------------+------------+--------------------+--------------------+

我想要的是用 hash_of_cc_pn_li 作为 和 id 作为列表值制作一个字典。

预期输出

{
"90cb0946cf4139e12": ["BOND-9129450", "BOND-7175508"]
"d5c301f00e9966483": ["BOND-1742850","BOND-7630290"]
}

到目前为止我尝试了什么,

方式一:导致java.lang.OutOfMemoryError: Java heap space

%%time
duplicate_property_list = {}
for ind in property_sql_df.collect():
hashed_value = ind.hash_of_cc_pn_li
property_id = ind.id
if hashed_value in duplicate_property_list:
duplicate_property_list[hashed_value].append(property_id)
else:
duplicate_property_list[hashed_value] = [property_id]

方式 2:由于在 pyspark 上缺少 native OFFSET 而无法工作

%%time
i = 0
limit = 1000000
for offset in range(0, total_record,limit):
i = i + 1
if i != 1:
offset = offset + 1

duplicate_property_list = {}
duplicate_properties = {}

# Preparing dataframe
url = '''select id, hash_of_cc_pn_li from properties_df LIMIT {} OFFSET {}'''.format(limit,offset)
properties_sql_df = spark.sql(url)

# Grouping dataset
rows = properties_sql_df.groupBy("hash_of_cc_pn_li").agg(F.collect_set("id").alias("ids")).collect()
duplicate_property_list = { row.hash_of_cc_pn_li: row.ids for row in rows }

# Filter a dictionary to keep elements only where duplicate cound
duplicate_properties = filterTheDict(duplicate_property_list, lambda elem : len(elem[1]) >=2)

# Writing to file
with open('duplicate_detected/duplicate_property_list_all_'+str(i)+'.json', 'w') as fp:
json.dump(duplicate_property_list, fp)

我现在在控制台上得到的内容:

java.lang.OutOfMemoryError: Java heap space

并在 Jupyter 笔记本输出 上显示此错误

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:33097)

这是我在这里提出的后续问题: Creating dictionary from Pyspark dataframe showing OutOfMemoryError: Java heap space

最佳答案

为什么不在 Executors 中保留尽可能多的数据和处理,而不是收集到 Driver?如果我理解正确,您可以使用 pyspark 转换和聚合并直接保存到 JSON,因此利用执行程序,然后将该 JSON 文件(可能已分区)作为字典加载回 Python。不可否认,您引入了 IO 开销,但这应该可以让您解决 OOM 堆空间错误。一步一步:

import pyspark.sql.functions as f


spark = SparkSession.builder.getOrCreate()
data = [
("BOND-9129450", "90cb"),
("BOND-1742850", "d5c3"),
("BOND-3211356", "811f"),
("BOND-7630290", "d5c3"),
("BOND-7175508", "90cb"),
]
df = spark.createDataFrame(data, ["id", "hash_of_cc_pn_li"])

df.groupBy(
f.col("hash_of_cc_pn_li"),
).agg(
f.collect_set("id").alias("id") # use f.collect_list() here if you're not interested in deduplication of BOND-XXXXX values
).write.json("./test.json")

检查输出路径:

ls -l ./test.json

-rw-r--r-- 1 jovyan users 0 Jul 27 08:29 part-00000-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
-rw-r--r-- 1 jovyan users 50 Jul 27 08:29 part-00039-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
-rw-r--r-- 1 jovyan users 65 Jul 27 08:29 part-00043-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
-rw-r--r-- 1 jovyan users 65 Jul 27 08:29 part-00159-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
-rw-r--r-- 1 jovyan users 0 Jul 27 08:29 _SUCCESS
_SUCCESS

作为 dict 加载到 Python:

import json
from glob import glob

data = []
for file_name in glob('./test.json/*.json'):
with open(file_name) as f:
try:
data.append(json.load(f))
except json.JSONDecodeError: # there is definitely a better way - this is here because some partitions might be empty
pass

最后

{item['hash_of_cc_pn_li']:item['id'] for item in data}

{'d5c3': ['BOND-7630290', 'BOND-1742850'],
'811f': ['BOND-3211356'],
'90cb': ['BOND-9129450', 'BOND-7175508']}

希望对您有所帮助!谢谢你提出的好问题!

关于java - 从显示 OutOfMemoryError : Java heap space 的大型 Pyspark 数据帧创建字典,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63109775/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com