gpt4 book ai didi

Python 多处理池 map_async 卡住

转载 作者:行者123 更新时间:2023-11-28 19:03:20 27 4
gpt4 key购买 nike

我有一个通过话语解析器运行的 80,000 个字符串的列表,为了提高这个过程的速度,我一直在尝试使用 python 多处理包。

解析器代码需要 python 2.7,我目前正在使用字符串的子集在 2 核 Ubuntu 机器上运行它。对于短列表,即 20,该过程在两个核心上运行都没有问题,但是如果我运行大约 100 个字符串的列表,两个工作人员将在不同的点卡住(因此在某些情况下,工作人员 1 直到几分钟才会停止在 worker 2) 之后。这发生在所有字符串完成并返回任何内容之前。每次内核在同一点停止时,使用相同的映射函数,但如果我尝试不同的映射函数,这些点是不同的,即 map vs map_async vs imap。

我尝试删除那些索引处的字符串,这没有任何影响,而且这些字符串在较短的列表中运行良好。根据我包含的 print 语句,当进程似乎卡住时,当前迭代似乎已完成当前字符串,它只是不会移动到下一个字符串。大约需要一个小时的运行时间才能到达两名 worker 都卡住的地方,我无法在更短的时间内重现该问题。涉及多处理命令的代码是:

def main(initial_file, chunksize = 2):
entered_file = pd.read_csv(initial_file)
entered_file = entered_file.ix[:, 0].tolist()

pool = multiprocessing.Pool()

result = pool.map_async(discourse_process, entered_file, chunksize = chunksize)

pool.close()
pool.join()

with open("final_results.csv", 'w') as file:
writer = csv.writer(file)
for listitem in result.get():
writer.writerow([listitem[0], listitem[1]])

if __name__ == '__main__':
main(sys.argv[1])

当我使用 Ctrl-C 停止进程时(这并不总是有效),我收到的错误消息是:

^CTraceback (most recent call last):
File "Combined_Script.py", line 94, in <module>
main(sys.argv[1])
File "Combined_Script.py", line 85, in main
pool.join()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 474, in join
p.join()
File "/usr/lib/python2.7/multiprocessing/process.py", line 145, in join
res = self._popen.wait(timeout)
File "/usr/lib/python2.7/multiprocessing/forking.py", line 154, in wait
return self.poll(0)
File "/usr/lib/python2.7/multiprocessing/forking.py", line 135, in poll
pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
Process PoolWorker-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 117, in worker
put((job, i, result))
File "/usr/lib/python2.7/multiprocessing/queues.py", line 390, in put
wacquire()
KeyboardInterrupt
^CProcess PoolWorker-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 117, in worker
put((job, i, result))
File "/usr/lib/python2.7/multiprocessing/queues.py", line 392, in put
return send(obj)
KeyboardInterrupt
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/usr/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
func(*targs, **kargs)
File "/usr/lib/python2.7/multiprocessing/util.py", line 305, in _exit_function
_run_finalizers(0)
File "/usr/lib/python2.7/multiprocessing/util.py", line 274, in _run_finalizers
finalizer()
File "/usr/lib/python2.7/multiprocessing/util.py", line 207, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 500, in _terminate_pool
outqueue.put(None) # sentinel
File "/usr/lib/python2.7/multiprocessing/queues.py", line 390, in put
wacquire()
KeyboardInterrupt
Error in sys.exitfunc:
Traceback (most recent call last):
File "/usr/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
func(*targs, **kargs)
File "/usr/lib/python2.7/multiprocessing/util.py", line 305, in _exit_function
_run_finalizers(0)
File "/usr/lib/python2.7/multiprocessing/util.py", line 274, in _run_finalizers
finalizer()
File "/usr/lib/python2.7/multiprocessing/util.py", line 207, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 500, in _terminate_pool
outqueue.put(None) # sentinel
File "/usr/lib/python2.7/multiprocessing/queues.py", line 390, in put
wacquire()
KeyboardInterrupt

当我使用 htop 在另一个命令窗口中查看内存时,一旦工作人员卡住,内存就会低于 3%。这是我第一次尝试并行处理,我不确定我还缺少什么?

最佳答案

我无法解决多处理池的问题,但我遇到了 loky包并能够使用它通过以下几行运行我的代码:

executor = loky.get_reusable_executor(timeout = 200, kill_workers = True)
results = executor.map(discourse_process, entered_file)

关于Python 多处理池 map_async 卡住,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50162249/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com