gpt4 book ai didi

python - 通过 pool.map_async 进行多处理对于大数据帧来说非常慢

转载 作者:太空宇宙 更新时间:2023-11-03 20:45:48 33 4
gpt4 key购买 nike

我正在将 800,000 行表读入数据帧。然后,我循环遍历每一列和列中的每一行,以收集统计信息,例如最大长度、最小长度、最大值、不同值等。

我可以使用 SLURM 访问 32 核计算,因此我想使用 pool.map_async 在单独的进程中处理数据帧中的每一列。

它比仅使用 for 循环慢得多。

我尝试将 CPU 数量减少到 8 个、4 个等,看看是否是进程启动导致的。

我怀疑是800,000行的panda系列的序列化导致的?

import cx_Oracle
import csv
import os
import glob
import datetime
import multiprocessing as mp
import get_column_stats as gs
import pandas as pd
import pandas.io.sql as psql


def get_data():
print("Starting Job: " + str(datetime.datetime.now()))

# Step 1: Init multiprocessing.Pool()
pool = mp.Pool(mp.cpu_count())
print("CPU Count: " + str(mp.cpu_count()))

dsn_tns = cx_Oracle.makedsn('myserver.net', '1521', service_name='myservice')
con = cx_Oracle.connect(user='ARIEL', password='zzzzz', dsn=dsn_tns)


stats_results = [["OWNER","TABLE","COLUMN_NAME","RECORD_COUNT","DISTINCT_VALUES","MIN_LENGTH","MAX_LENGTH","MIN_VAL","MAX_VAL"]]

sql = "SELECT * FROM ARIEL.DIM_REGISTRATION_SET"

cur = con.cursor()
print("Start Executing SQL: " + str(datetime.datetime.now()))

df = psql.read_sql(sql, con);

print("End SQL Execution: " + str(datetime.datetime.now()))

col_names = df.columns.values.tolist()
col_index = 0


print("Start In-Memory Iteration of Dataset: " + str(datetime.datetime.now()))
# we go through every field

# start at column 0
col_index = 0

# iterate through each column, to gather stats from each column using parallelisation
proc_results = pool.map_async(gs.get_column_stats, df.iteritems()).get()




# Step 3: Don't forget to close
pool.close()
pool.join()


for result in proc_results:
stats_results.append(result)


print("End In-Memory Iteration of Dataset: " + str(datetime.datetime.now()))
# end filename for
cur.close()

outfile = open('C:\jupyter\Experiment\stats_dim_registration_set.csv','w')
writer=csv.writer(outfile,quoting=csv.QUOTE_ALL, lineterminator='\n')
writer.writerows(stats_results)
outfile.close()
print("Ending Job: " + str(datetime.datetime.now()))





get_data()

被称为异步的代码:

import os
import sys

def strip_crlf(value):
return value.replace('\n', ' ').replace('\r', '')

def get_column_stats(args):
# args is a tuple, the first value is the column name of the panda series, the second value is the panda data series

col_name, rs = args
sys.stdout = open("col_" + col_name + ".out", "a")

print("Starting Iteration of Column: " + col_name)
max_length = 0
min_length = 100000 # abitrarily large number!!

max_value = ""
min_value = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz" # abitrarily large number!!

distinct_value_count = 0

has_values = False # does the column have any non-null values
has_null_values = False

row_count = 0

# create a dictionary into which we can add the individual items present in each row of data
# a dictionary will not let us add the same value more than once, so we can simply count the
# dictionary values at the end
distinct_values = {}

row_index = 0



# go through every row, for the current column being processed to gather the stats
for row_value in rs.values:
row_count += 1


if row_value is None:
value_length = 0
else:
value_length = len(str(row_value))


if value_length > max_length:
max_length = value_length

if value_length < min_length:
if value_length > 0:
min_length = value_length

if row_value is not None:
if str(row_value) > max_value:
max_value = str(row_value)
if str(row_value) < min_value:
min_value = str(row_value)

# capture distinct values
if row_value is None:
row_value = "Null"
has_null_values = True
else:
has_values = True
distinct_values[row_value] = 1

row_index += 1
# end row for

distinct_value_count = len(distinct_values)

if has_values == False:
distinct_value_count = None
min_length = None
max_length = None
min_value = None
max_value = None
elif has_null_values == True and distinct_value_count > 0:
distinct_value_count -= 1

if min_length == 0 and max_length > 0 and has_values == True:
min_length = max_length

print("Ending Iteration of Column: " + col_name)


return ["ARIEL","DIM_REGISTRATION_SET", col_name,row_count, distinct_value_count, min_length, max_length,
strip_crlf(str(min_value)), strip_crlf(str(max_value))]

最佳答案

您可以通过让每个进程查询一列来提高速度,而不是在开始时查询所有列,这需要将它们复制到所有子进程。

一种想法是只查询中央进程中的列名称,然后仅将列名称发送到子进程。然后用 SELECT columnX 而不是 SELECT *

关于python - 通过 pool.map_async 进行多处理对于大数据帧来说非常慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56615216/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com