gpt4 book ai didi

python - 分布式 TensorFlow - 不运行一些 worker

转载 作者:太空宇宙 更新时间:2023-11-03 14:06:38 24 4
gpt4 key购买 nike

我正在尝试获取分布式 TensorFlow 工作的一个非常简单的示例。但是,我有一个在运行之间不确定地出现的错误。在某些运行中,它工作得很好。输出如下内容:

Worker 2 | step 0
Worker 0 | step 0
Worker 1 | step 0
Worker 3 | step 0
Worker 2 | step 1
Worker 0 | step 1
Worker 1 | step 1
Worker 3 | step 1
...

然而,每隔一段时间,一个或多个 worker 无法运行,导致输出如下:

Worker 0 | step 0
Worker 3 | step 0
Worker 0 | step 1
Worker 3 | step 1
Worker 0 | step 2
Worker 3 | step 2
...

如果我无限期地运行循环,似乎失踪的 worker 总是在某个时间点启动,但只是在几分钟后启动,这是不切实际的。

我发现有两件事可以解决问题(但会使程序无用): 1. 不在 with tf.device(tf.train.replica_device_setter()) 中声明任何 tf 变量范围。如果我什至声明一个变量(例如下面的 nasty_var),问题就会开始出现。和 2. 设置 is_chief参数 tf.train.MonitoredTrainingSession()True对于所有 worker 。这会导致即使声明了变量,错误也会消失,但让所有的 worker 都成为首席似乎是错误的。我目前在下面设置它的方式 - is_chief=(task_index == 0) - 直接取自 TensorFlow 教程。

这是我能得到的最简单的代码来重现这个问题。 (您可能需要运行多次才能看到错误,但它几乎总是在 5 次运行中出现

from multiprocessing import Process
import tensorflow as tf
from time import sleep
from numpy.random import random_sample

cluster = tf.train.ClusterSpec({'ps': ['localhost:2222'],
'worker': ['localhost:2223',
'localhost:2224',
'localhost:2225',
'localhost:2226']})


def create_worker(task_index):
server = tf.train.Server(cluster, job_name='worker', task_index=task_index)

with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_index, cluster=cluster)):
nasty_var = tf.Variable(0) # This line causes the problem. No issue when this is commented out.

with tf.train.MonitoredTrainingSession(master=server.target, is_chief=(task_index == 0)):
for step in xrange(10000):
sleep(random_sample()) # Simulate some work being done.
print 'Worker %d | step %d' % (task_index, step)


def create_ps(task_index):
param_server = tf.train.Server(cluster, job_name='ps',
task_index=task_index)
param_server.join()

# Launch workers and ps in separate processes.
processes = []
for i in xrange(len(cluster.as_dict()['worker'])):
print 'Forking worker process ', i
p = Process(target=create_worker, args=[i])
p.start()
processes.append(p)

for i in xrange(len(cluster.as_dict()['ps'])):
print 'Forking ps process ', i
p = Process(target=create_ps, args=[i])
p.start()
processes.append(p)

for p in processes:
p.join()

最佳答案

我猜这里的原因是 tf.train.MonitoredTrainingSession 中的隐式协调协议(protocol)。开始,已实现 here :

  • 如果这个session是chief:

    • 运行变量初始化操作。
  • Else(如果本届 session 不是首席):

    • 运行一个操作来检查变量是否已经初始化。
    • 虽然任何变量还没有被初始化。
      • 等待 30 秒。
      • 尝试创建一个新 session ,并检查变量是否已初始化。

(我在 video about Distributed TensorFlow 中讨论了该协议(protocol)背后的基本原理。)

当每个 session 都是首席时,或者没有要初始化的变量时,tf.train.MonitoredTrainingSession 将始终立即启动。但是,一旦只有一个变量,而且你只有一个主管,你就会看到非主管员工必须等待主管行动。

使用此协议(protocol)的原因是它对各种失败的进程都很稳健,并且与典型的分布式训练作业的预期运行时间相比,延迟虽然在单个进程上运行所有内容时非常明显,但很短。

查看the implementation again ,似乎这个 30 秒超时应该是可配置的(作为 tf.train.SessionManager()recovery_wait_secs 参数),但目前无法在创建 tf. train.MonitoredTrainingSession,因为它使用一组硬编码的参数 for creating a session manager .这似乎是 API 中的一个疏忽,所以请随时在 GitHub issues page 上打开一个功能请求。 !

关于python - 分布式 TensorFlow - 不运行一些 worker ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42986653/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com