gpt4 book ai didi

python - tensorflow + 作业库 : limited to 8 processes?

转载 作者:行者123 更新时间:2023-12-05 06:32:26 25 4
gpt4 key购买 nike

我使用 TensorFlow 创建了一个统计估算器。我遵循了 sklearn 的估算器,所以我有一个打包所有内容的类包括导入 Tensorflow 和启动 TF session (如果我在类外导入 TF,则根本没有并行工作)。

我需要在随机数据上多次运行该估计器以查看经验分布,因此我使用 joblib 并行化创建数据的代码、创建估计器对象并对数据运行估计。我在一个有 64 个内核(和大量内存)的 Linux 服务器上工作,我在那里运行了比这大得多的工作,也使用了 joblib。但是,当我尝试运行基于 TF 的代码时,我只能运行 8 个进程。如果我尝试使用 9,那么 top 中只会显示 8 个,当这 8 个完成时,joblib 不再发送另一个 8 并且根本不返回,或者它返回以下错误消息

"BrokenProcessPool: A process in the executor was terminated abruptly while the future was running or pending."

如果我将进程限制为 8 个,那么一切正常。我尝试将 joblib 的后端更改为 dask.parallel 并且我有相同的行为。我从后端获得了更多信息,不断有消息说

"distributed.nanny - WARNING - Worker process 7602 was killed by unknown signal"

我希望能够运行超过 8 个进程。 问题是:这是硬限制还是我可以通过一些 TF 参数更改它?我能以任何方式解决这个问题吗?我认为这个限制与 Tensorflow 相关,因为一旦 8 个进程正在运行(并且它们需要几个小时)我就无法在该机器上从 Tensorflow 运行任何其他东西。

感谢您的帮助!

以下代码重现了错误:

from sklearn.base import TransformerMixin
import numpy as np
from joblib import Parallel, delayed

class MyEstimator(TransformerMixin):
def __init__(self):
import tensorflow as tf
self._tf = tf
self._graph = tf.Graph()
with self._graph.as_default():
self.session = self._tf.Session()
A0 = np.eye(10, 2)
self.a_var = a_var = tf.Variable(A0, name='a_var', dtype=tf.float64)
self._x = x = tf.placeholder(dtype=tf.float64)
self._y = y= tf.placeholder(dtype=tf.float64)
w = tf.tensordot(a_var, x, axes=0)
self.f = tf.reduce_mean((y-w)**2)

def fit(self, x, y):
#self.session.run(
# self._tf.global_variables_initializer())
self._f = self.session.run(self.f, feed_dict={self._x:x, self._y: y, self.a_var:np.eye(10, 2)})

return self

def run_estimator():
my_est = MyEstimator()
x = np.random.normal(0,1,10)
y = np.random.normal(0,1,10)
my_est.fit(x,y)

Parallel(n_jobs=16)(delayed(run_estimator)() for _ in range(16))

我正在使用 Linux、Python 3.6.3、TensorFlow 1.7.0、joblib 0.12。

最佳答案

几个月后,我找到了 TensorFlow 服务器的解决方案,https://www.tensorflow.org/deploy/distributed

from sklearn.base import TransformerMixin
import numpy as np
from joblib import Parallel, delayed

class MyEstimator(TransformerMixin):
def __init__(self, target):
import tensorflow as tf
self._tf = tf
self._graph = tf.Graph()
with self._graph.as_default():
config = self._tf.ConfigProto(
intra_op_parallelism_threads=1,
inter_op_parallelism_threads=1,
device_count={"CPU":4},
use_per_session_threads=True)
config.graph_options.optimizer_options.global_jit_level = tf.OptimizerOptions.ON_1
pool = config.session_inter_op_thread_pool.add()
pool.num_threads = 1

self.session = self._tf.Session(target)
A0 = np.eye(10, 2)
self.a_var = a_var = tf.Variable(A0, name='a_var', dtype=tf.float64)
self._x = x = tf.placeholder(dtype=tf.float64)
self._y = y= tf.placeholder(dtype=tf.float64)
w = tf.tensordot(a_var, x, axes=0)
self.f = tf.reduce_mean((y-w)**2)

def fit(self, x, y):
#self.session.run(
# self._tf.global_variables_initializer())
self._f = self.session.run(self.f, feed_dict={self._x:x, self._y: y, self.a_var:np.eye(10, 2)})

return self

def run_estimator(target):
my_est = MyEstimator(target)
x = np.random.normal(0,1,10)
y = np.random.normal(0,1,10)
my_est.fit(x,y)
return 1

import tensorflow as tf
server = tf.train.Server.create_local_server()
Parallel(n_jobs=16)(delayed(run_estimator)(server.target) for _ in range(16))

关于python - tensorflow + 作业库 : limited to 8 processes?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51343207/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com