gpt4 book ai didi

python - pyspark 中处理大数据的优化

转载 作者:太空宇宙 更新时间:2023-11-03 16:03:20 24 4
gpt4 key购买 nike

不是问题->需要建议

我正在使用 1+3(1 个主设备,3 个从设备(每个 16 GB RAM))操作 20GB+6GB=26Gb csv 文件。

这就是我的操作方式

df = spark.read.csv() #20gb
df1 = spark.read.csv() #6gb
df_merged= df.join(df1,'name','left') ###merging
df_merged.persists(StorageLevel.MEMORY_AND_DISK) ##if i do MEMORY_ONLY will I gain more performance?????
print('No. of records found: ',df_merged.count()) ##just ensure persist by calling an action
df_merged.registerTempTable('table_satya')
query_list= [query1,query2,query3] ###sql query string to be fired
city_list = [city1, city2,city3...total 8 cities]
file_index=0 ###will create files based on increasing index
for query_str in query_list:
result = spark.sql(query_str) #ex: select * from table_satya where date >= '2016-01-01'
#result.persist() ###willit increase performance
for city in city_list:
df_city = result.where(result.city_name==city)
#store as csv file(pandas style single file)
df_city.collect().toPandas().to_csv('file_'+str(file_index)+'.csv',index=False)
file_index += 1

df_merged.unpersist() ###do I even need to do it or Spark can handle it internally

目前需要很长时间。

#persist(On count())-34 mins.
#each result(on firing each sql query)-around (2*8=16min toPandas() Op)
# #for each toPandas().to_csv() - around 2 min each
#for 3 query 16*3= 48min
#total 34+48 = 82 min ###Need optimization seriously

那么有人可以建议我如何优化上述过程以获得更好的性能(时间和内存两者。)

我担心的原因是:我在 Python-Pandas 平台(具有序列化 pickle 数据的 64Gb 单机)上执行上述操作,并且我能够在 8-12 分钟内完成。由于我的数据量似乎在不断增长,因此需要采用像 Spark 这样的技术。

提前致谢。 :)

最佳答案

我认为最好的选择是将源数据缩小到一定大小。您提到您的源数据有 90 个城市,但您只对其中 8 个感兴趣。过滤掉您不想要的城市,并将您想要的城市保留在单独的 csv 文件中:

import itertools
import csv

city_list = [city1, city2,city3...total 8 cities]

with open('f1.csv', 'rb') as f1, open('f2.csv', 'rb') as f2:
r1, r2 = csv.reader(f1), csv.reader(f2)
header = next(r1)
next(r2) # discard headers in second file
city_col = header.index('city_name')
city_files = []
city_writers = {}
try:
for city in city_list:
f = open(city+'.csv', 'wb')
city_files.append(f)
writer = csv.writer(f)
writer.writerow(header)
city_writers[city] = writer
for row in itertools.chain(r1, r2):
city_name = row[city_col]
if city_name in city_writers:
city_writers[city_name].writerow(row)
finally:
for f in city_files:
f.close()

迭代每个城市后,为该城市创建一个 DataFrame,然后在嵌套循环中运行三个查询。每个 DataFrame 应该没有问题适合内存,并且查询应该快速运行,因为它们在更小的数据集上运行。

关于python - pyspark 中处理大数据的优化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40089822/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com