gpt4 book ai didi

python - Python 中的多处理比单线程慢

转载 作者:太空宇宙 更新时间:2023-11-03 14:27:56 25 4
gpt4 key购买 nike

我最初的问题是关于 Python 下的并行性。然而,由于这个问题仍然没有答案,我删除了它,并尝试总结我的结论。希望它能帮助某人...

一般来说,有两种主要方法可以使代码并行运行 - 使用多线程多处理库。

根据 stackoverflow.com 上的许多帖子,多线程库能够跨线程有效地共享内存,但在单个核心上运行线程。因此,如果瓶颈是 I/O 操作,它可以加快代码速度。我不确定该库是否有很多现实生活中的应用程序......

如果您的代码是 CPU 密集型的(有时称为 CPU 受限的),多处理 库可以解决您的问题。该库将线程分布在各个核心上。然而,许多人(包括我)观察到,这样的多核代码可能比单核代码慢得多。据推测,该问题是由于各个线程无法有效共享内存而引起的 - 数据被广泛复制,这会产生相当大的开销。正如我的下面的代码所示,开销很大程度上取决于输入数据类型。 Windows 上的问题比 Linux 上的问题更为严重。我不得不说,并行性是我对 Python 最大的失望——显然 Python 在设计时并没有考虑到并行性......

第一段代码使用 Process 在内核之间分配 pandas dataframe

import numpy as np
import math as mth
import pandas as pd
import time as tm
import multiprocessing as mp

def bnd_calc_npv_dummy(bnds_info, core_idx, npv):
""" multiple core dummy valuation function (based on single core function) """

bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)

npv[core_idx] = np.array(bnds_info['npv'])

def split_bnds_info(bnds_info, cores_no):
""" cut dataframe with bond definitions into pieces - one piece per core """

bnds_info_mp = []
bnds_no = len(bnds_info)
batch_size = mth.ceil(np.float64(bnds_no) / cores_no) # number of bonds allocated to one core

# split dataframe among cores
for idx in range(cores_no):
lower_bound = int(idx * batch_size)
upper_bound = int(np.min([(idx + 1) * batch_size, bnds_no]))
bnds_info_mp.append(bnds_info[lower_bound : upper_bound].reset_index().copy())

# return list of dataframes
return bnds_info_mp

def bnd_calc_npv(bnds_info, cores_no):
""" dummy valuation function running multicore """

manager = mp.Manager()
npv = manager.dict()

bnds_info_mp = split_bnds_info(bnds_info, cores_no)

processes = [mp.Process(target = bnd_calc_npv_dummy, args = (bnds_info_mp[core_idx], core_idx, npv)) for core_idx in xrange(cores_no)]
[process.start() for process in processes]
[process.join() for process in processes]

# return NPV of individual bonds
return np.hstack(npv.values())

if __name__ == '__main__':

# create dummy dataframe
bnds_no = 1200 # number of dummy in the sample
bnds_info = {'currency_name' : 'EUR', 'npv' : 100}
bnds_info = pd.DataFrame(bnds_info, index = range(1))
bnds_info = pd.concat([bnds_info] * bnds_no, ignore_index = True)

# one core
print("ONE CORE")
start_time = tm.time()
bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)
npv = np.array(bnds_info['npv'])
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')

# two cores
print("TWO CORES")
cores_no = 2
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')

# three cores
print("THREE CORES")
cores_no = 3
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')

# four cores
print("FOUR CORES")
cores_no = 4
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')

第二个代码与之前的代码相同 - 唯一的区别是这次我们使用 numpy array 而不是 pandas dataframe 并且性能差异巨大(比较单核的运行时间变化与多核的运行时间变化。

import numpy as np
import math as mth
import time as tm
import multiprocessing as mp

def bnd_calc_npv_dummy(bnds_info, core_idx, npv):
""" multiple core dummy valuation function (based on single core function) """

bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)

npv[core_idx] = bnds_info

def split_bnds_info(bnds_info, cores_no):
""" cut dataframe with bond definitions into pieces - one piece per core """

bnds_info_mp = []
bnds_no = len(bnds_info)
batch_size = mth.ceil(np.float64(bnds_no) / cores_no) # number of bonds allocated to one core

# split dataframe among cores
for idx in range(cores_no):
lower_bound = int(idx * batch_size)
upper_bound = int(np.min([(idx + 1) * batch_size, bnds_no]))
bnds_info_mp.append(bnds_info[lower_bound : upper_bound])

# return list of dataframes
return bnds_info_mp

def bnd_calc_npv(bnds_info, cores_no):
""" dummy valuation function running multicore """

manager = mp.Manager()
npv = manager.dict()

bnds_info_mp = split_bnds_info(bnds_info, cores_no)

processes = [mp.Process(target = bnd_calc_npv_dummy, args = (bnds_info_mp[core_idx], core_idx, npv)) for core_idx in xrange(cores_no)]
[process.start() for process in processes]
[process.join() for process in processes]

# return NPV of individual bonds
return np.hstack(npv.values())

if __name__ == '__main__':

# create dummy dataframe
bnds_no = 1200 # number of dummy in the sample
bnds_info = np.array([100] * bnds_no)

# one core
print("ONE CORE")
start_time = tm.time()
bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')

# two cores
print("TWO CORES")
cores_no = 2
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')

# three cores
print("THREE CORES")
cores_no = 3
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')

# four cores
print("FOUR CORES")
cores_no = 4
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')

最后一段代码使用Pool而不是Process。运行时间稍微好一些。

import numpy as np
import time as tm
import multiprocessing as mp

#import pdb
#pdb.set_trace()

def bnd_calc_npv_dummy(bnds_info):
""" multiple core dummy valuation function (based on single core function) """

try:
# get number of bonds
bnds_no = len(bnds_info)
except:
pass
bnds_no = 1

tm.sleep(0.0001 * bnds_no)

return bnds_info

def bnd_calc_npv(bnds_info, cores_no):
""" dummy valuation function running multicore """

pool = mp.Pool(processes = cores_no)
npv = pool.map(bnd_calc_npv_dummy, bnds_info.tolist())

# return NPV of individual bonds
return npv

if __name__ == '__main__':

# create dummy dataframe
bnds_no = 1200 # number of dummy in the sample
bnds_info = np.array([100.0] * bnds_no)

# one core
print("ONE CORE")
start_time = tm.time()
bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')

# two cores
print("TWO CORES")
cores_no = 2
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')

# three cores
print("THREE CORES")
cores_no = 3
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')

# four cores
print("FOUR CORES")
cores_no = 4
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')

所以,我的结论是Python实现的并行性在现实生活中不适用(我使用Python 2.7.13和Window 7)。谨致问候,

麦基

PS:如果有人能够更改代码,我会很乐意改变我的想法......

最佳答案

当问题的某些部分可以独立计算时,多重处理效果最佳,例如使用multiprocessing.Pool。池中的每个工作进程处理部分输入并将结果返回给主进程。

如果所有进程都需要修改整个输入数组中的数据,那么来自管理器的同步开销很可能会破坏多处理带来的任何 yield 。

关于python - Python 中的多处理比单线程慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47490334/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com