gpt4 book ai didi

python - 如何修复 BrokenProcessPool : error for concurrent. future ProcessPoolExecutor

转载 作者:行者123 更新时间:2023-12-04 00:25:49 26 4
gpt4 key购买 nike

使用 concurrent.futures.ProcessPoolExecutor 我正在尝试运行第一段代码以并行执行函数“Calculate_Forex_Data_Derivatives(data,gride_spacing)”。调用结果 executor_list[i].result() 时,我得到“BrokenProcessPool:进程池中的进程在 future 运行或挂起时突然终止。”我尝试运行将函数的多次调用发送到处理池的代码,以及运行代码仅向处理池发送一次调用,两者都导致错误。

我还用一段更简单的代码(提供的第二个代码)测试了代码的结构,其中调用函数的输入类型相同,它工作正常。我可以在两段代码之间看到的唯一不同之处是第一个代码调用了“findiff”模块中的函数“FinDiff(axis,grid_spacing,derivative_order)”。此函数与“Calculate_Forex_Data_Derivatives(data,gride_spacing)”一起在正常串联运行时完美地工作。

我正在使用 Anaconda 环境、Spyder 编辑器和 Windows。

任何帮助,将不胜感激。

#code that returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."

import pandas as pd
import numpy as np
from findiff import FinDiff
import multiprocessing
import concurrent.futures

def Calculate_Forex_Data_Derivatives(forex_data,dt): #function to run in parallel
try:
dClose_dt = FinDiff(0,dt,1)(forex_data)[-1]
except IndexError:
dClose_dt = np.nan

try:
d2Close_dt2 = FinDiff(0,dt,2)(forex_data)[-1]
except IndexError:
d2Close_dt2 = np.nan

try:
d3Close_dt3 = FinDiff(0,dt,3)(forex_data)[-1]
except IndexError:
d3Close_dt3 = np.nan

return dClose_dt, d2Close_dt2, d3Close_dt3

#input for function
#forex_data is pandas dataframe, forex_data['Close'].values is numpy array
#dt is numpy array
#input_1 and input_2 are each a list of numpy arrays

input_1 = []
input_2 = []
for forex_data_index,data_point in enumerate(forex_data['Close'].values[:1]):
input_1.append(forex_data['Close'].values[:forex_data_index+1])
input_2.append(dt[:forex_data_index+1])


def multi_processing():
executors_list = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
for index in range(len(input_1)):
executors_list.append(executor.submit(Calculate_Forex_Data_Derivatives,input_1[index],input_2[index]))

return executors_list

if __name__ == '__main__':
print('calculating derivatives')
executors_list = multi_processing()

for output in executors_list
print(output.result()) #returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."


##############################################################


#simple example that runs fine

def function(x,y): #function to run in parallel
try:
asdf
except NameError:
a = (x*y)[0]
b = (x+y)[0]

return a,b

x=[np.array([0,1,2]),np.array([3,4,5])] #function inputs, list of numpy arrays
y=[np.array([6,7,8]),np.array([9,10,11])]

def multi_processing():
executors_list = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
for index,_ in enumerate(x):
executors_list.append(executor.submit(function,x[index],y[index]))

return executors_list

if __name__ == '__main__':
executors_list = multi_processing()

for output in executors_list: #prints as expected
print(output.result()) #(0, 6)
#(27, 12)

最佳答案

我知道破坏 ProcessPoolExecutor 管道的三种典型方法:

操作系统终止/终止

您的系统遇到限制,很可能是内存,并开始终止进程​​。由于 windows 上的 fork 会克隆您的内存内容,因此在使用大型 DataFrame 时这并非不可能。

如何识别

  • 在任务管理器中检查内存消耗。
  • 除非你的 DataFrames 占用了你一半的内存,否则它应该会随着 max_workers=1 消失。 , 然而这并不是明确的。

  • worker 的自我终止

    子进程的 Python 实例由于未引发适当异常的某些错误而终止。一个示例是导入的 C 模块中的段错误。

    如何识别

    由于您的代码在没有 PPE 的情况下正常运行,我能想到的唯一情况是某个模块不是多处理安全的。然后它也有机会随着 max_workers=1 消失。 .也有可能通过在创建 worker 后立即手动调用函数来在主进程中引发错误(调用 executor.submit 的 for 循环之后的行。
    否则可能真的很难识别,但在我看来,这是最不可能的情况。

    PPE 规范中的异常(exception)情况

    管道的子进程端(即处理通信的代码)可能会崩溃,这会导致适当的异常,遗憾的是无法与主进程通信。

    如何识别

    由于代码(希望)经过良好测试,因此主要嫌疑人在于返回数据。它必须被腌制并通过套接字发回——这两个步骤都可能崩溃。所以你必须检查:
  • 返回数据是否可以选择?
  • 腌制的物体是否足够小可以发送(大约 2GB)?

  • 因此,您可以尝试返回一些简单的虚拟数据,或者明确检查这两个条件:
        if len(pickle.dumps((dClose_dt, d2Close_dt2, d3Close_dt3))) > 2 * 10 ** 9: 
    raise RuntimeError('return data can not be sent!')

    在 Python 3.7 中,这个问题得到了修复,它发回了异常。

    关于python - 如何修复 BrokenProcessPool : error for concurrent. future ProcessPoolExecutor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57031253/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com