gpt4 book ai didi

python - 如何减少 python 中的多处理时间

转载 作者:太空宇宙 更新时间:2023-11-03 15:56:12 25 4
gpt4 key购买 nike

我试图在Python中构建多处理来降低计算速度,但似乎在多处理之后,整体计算速度显着下降。我创建了 4 个不同的进程,并将 dataFrame 拆分为 4 个不同的 dataframe,这将作为每个进程的输入。对每个进程进行计时后,似乎开销成本很大,并且想知道是否有方法可以减少这些开销成本。

我使用的是windows7,python 3.5,我的机器有8核。

def doSomething(args, dataPassed,):

processing data, and calculating outputs

def parallelize_dataframe(df, nestedApply):
df_split = np.array_split(df, 4)
pool = multiprocessing.Pool(4)
df = pool.map(nestedApply, df_split)
print ('finished with Simulation')
time = float((dt.datetime.now() - startTime).total_seconds())

pool.close()
pool.join()

def nestedApply(df):

func2 = partial(doSomething, args=())
res = df.apply(func2, axis=1)
res = [output Tables]
return res

if __name__ == '__main__':

data = pd.read_sql_query(query, conn)

parallelize_dataframe(data, nestedApply)

最佳答案

我建议使用队列而不是提供 DataFrame 作为 block 。您需要大量资源来复制每个 block ,并且需要相当长的时间。如果您的 DataFrame 非常大,您可能会耗尽内存。使用队列,您可以从 pandas 中的快速迭代器中受益。这是我的方法。开销随着工作人员的复杂性而降低。不幸的是,我的工作人员远未简单地真正展示这一点,但 sleep 模拟了一点复杂性。

import pandas as pd
import multiprocessing as mp
import numpy as np
import time


def worker(in_queue, out_queue):
for row in iter(in_queue.get, 'STOP'):
value = (row[1] * row[2] / row[3]) + row[4]
time.sleep(0.1)
out_queue.put((row[0], value))

if __name__ == "__main__":
# fill a DataFrame
df = pd.DataFrame(np.random.randn(1e5, 4), columns=list('ABCD'))

in_queue = mp.Queue()
out_queue = mp.Queue()

# setup workers
numProc = 2
process = [mp.Process(target=worker,
args=(in_queue, out_queue)) for x in range(numProc)]

# run processes
for p in process:
p.start()

# iterator over rows
it = df.itertuples()

# fill queue and get data
# code fills the queue until a new element is available in the output
# fill blocks if no slot is available in the in_queue
for i in range(len(df)):
while out_queue.empty():
# fill the queue
try:
row = next(it)
in_queue.put((row[0], row[1], row[2], row[3], row[4]), block=True) # row = (index, A, B, C, D) tuple
except StopIteration:
break
row_data = out_queue.get()
df.loc[row_data[0], "Result"] = row_data[1]

# signals for processes stop
for p in process:
in_queue.put('STOP')

# wait for processes to finish
for p in process:
p.join()

使用 numProc = 2 每个循环需要 50 秒,使用 numProc = 4 则快两倍。

关于python - 如何减少 python 中的多处理时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40747674/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com