gpt4 book ai didi

python - 关闭管理器错误 "AttributeError: ' ForkAwareLocal' object has no attribute 'connection' "when using namespace and shared memory dict

转载 作者:行者123 更新时间:2023-12-05 01:38:09 25 4
gpt4 key购买 nike

我正在尝试:

  1. 在进程之间共享数据框
  2. 根据对该数据框执行的计算(但不更改)更新共享字典

我正在使用 multiprocessing.Manager() 在共享内存中创建一个 dict(存储结果)和一个 Namespace 来存储/分享我想要读取的数据框。

import multiprocessing

import pandas as pd
import numpy as np


def add_empty_dfs_to_shared_dict(shared_dict, key):
shared_dict[key] = pd.DataFrame()


def edit_df_in_shared_dict(shared_dict, namespace, ind):
row_to_insert = namespace.df.loc[ind]
df = shared_dict[ind]
df[ind] = row_to_insert
shared_dict[ind] = df


if __name__ == '__main__':
manager = multiprocessing.Manager()
shared_dict = manager.dict()
namespace = manager.Namespace()

n = 100
dataframe_to_be_shared = pd.DataFrame({
'player_id': list(range(n)),
'data': np.random.random(n),
}).set_index('player_id')

namespace.df = dataframe_to_be_shared

for i in range(n):
add_empty_dfs_to_shared_dict(shared_dict, i)

jobs = []
for i in range(n):
p = multiprocessing.Process(
target=edit_df_in_shared_dict,
args=(shared_dict, namespace, i)
)
jobs.append(p)
p.start()

for p in jobs:
p.join()

print(shared_dict[1])

运行上面的代码时,它会正确地写入 shared_dict,因为我的 print 语句使用一些数据执行。我还收到有关经理的错误消息:

Process Process-88:
Traceback (most recent call last):
File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 788, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/Users/henrysorsky/Library/Preferences/PyCharm2019.2/scratches/scratch_13.py", line 34, in edit_df_in_shared_dict
row_to_insert = namespace.df.loc[ind]
File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 1099, in __getattr__
return callmethod('__getattribute__', (key,))
File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 792, in _callmethod
self._connect()
File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 779, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/connection.py", line 492, in Client
c = SocketClient(address)
File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/connection.py", line 619, in SocketClient
s.connect(address)
ConnectionRefusedError: [Errno 61] Connection refused

我知道这是管理器发出的,似乎是因为它没有正常关闭。我能在网上找到的唯一类似问题:

Share list between process in python server

建议加入我已经在做的所有子进程。

最佳答案

所以在一整夜的 sleep 之后,我意识到实际上是读取共享内存中的数据帧导致了问题,并且在大约第 20 个子进程中,其中一些读取失败。我添加了一次运行的最大进程数,这解决了这个问题。

对于任何想知道的人,我使用的代码是:

import multiprocessing

import pandas as pd
import numpy as np

def add_empty_dfs_to_shared_dict(shared_dict, key):
shared_dict[key] = pd.DataFrame()


def edit_df_in_shared_dict(shared_dict, namespace, ind):
row_to_insert = namespace.df.loc[ind]
df = shared_dict[ind]
df[ind] = row_to_insert
shared_dict[ind] = df


if __name__ == '__main__':
# region define inputs

max_jobs_running = 4
n = 100

# endregion

manager = multiprocessing.Manager()
shared_dict = manager.dict()
namespace = manager.Namespace()

dataframe_to_be_shared = pd.DataFrame({
'player_id': list(range(n)),
'data': np.random.random(n),
}).set_index('player_id')

namespace.df = dataframe_to_be_shared

for i in range(n):
add_empty_dfs_to_shared_dict(shared_dict, i)

jobs = []
jobs_running = 0
for i in range(n):
p = multiprocessing.Process(
target=edit_df_in_shared_dict,
args=(shared_dict, namespace, i)
)
jobs.append(p)
p.start()

jobs_running += 1

if jobs_running >= max_jobs_running:
while jobs_running >= max_jobs_running:
jobs_running = 0
for p in jobs:
jobs_running += p.is_alive()

for p in jobs:
p.join()

for key, value in shared_dict.items():
print(f"key: {key}")
print(f"value: {value}")
print("-" * 50)

这可能通过 QueuePool 设置而不是我的 hacky 修复来更好地处理。

关于python - 关闭管理器错误 "AttributeError: ' ForkAwareLocal' object has no attribute 'connection' "when using namespace and shared memory dict,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60049527/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com