gpt4 book ai didi

python - Tensorflow多处理; UnknownError : Could not start gRPC server

转载 作者:行者123 更新时间:2023-12-03 13:08:54 25 4
gpt4 key购买 nike

我正在研究大型数据集上的粗麻布矩阵。我正在尝试在多个CPU上并行执行这些计算。我的设置当前有1个节点和10个CPU。我正在使用Python 2.7

我为代码编写了一个小抽象,以更好地理解分布式 tensorflow 。下面是错误

2017-07-23 16:16:17.281414: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:316] Started server with target: grpc://localhost:2225
Process Process-3:
Traceback (most recent call last):
File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 32, in cifar10
serv = tf.train.Server(cluster, job_name= params.job_name,task_index=params.task_index)
File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/server_lib.py", line 145, in __init__
self._server_def.SerializeToString(), status)
File "/home/skay/anaconda2/lib/python2.7/contextlib.py", line 24, in __exit__
self.gen.next()
File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 466, in raise_exception_on_not_ok_status
pywrap_tensorflow.TF_GetCode(status)) UnknownError: Could not start gRPC server

每次运行代码时,我都会收到此错误。但是,它会进一步发展为我如下设置的两个过程之一的输出
> `2017-07-23 16:27:48.605617: I tensorflow/core/distributed_runtime/master_session.cc:999] Start master session fe9fd6a338e2c9a7 with config: 

2017-07-23 16:27:48.607126: I tensorflow/core/distributed_runtime/master_session.cc:999] Start master session 3560417f98b00dea with config:

[ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10.]
Process-3
[ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10.]
Process-3
[ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10.]
Process-3

在此它继续等待下一个
ERROR:tensorflow:==================================
Object was never used (type <class 'tensorflow.python.framework.ops.Operation'>):
<tf.Operation 'worker_0/init' type=NoOp>
If you want to mark it as used call its "mark_used()" method.
It was originally created here:
['File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 83, in <module>\n proc.start()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 130, in start\n self._popen = Popen(self)', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/forking.py", line 126, in __init__\n code = process_obj._bootstrap()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap\n self.run()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run\n self._target(*self._args, **self._kwargs)', 'File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 49, in cifar10\n init_op=tf.initialize_all_variables(),logdir=\'/tmp/mydir\')', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 170, in wrapped\n return _add_should_use_warning(fn(*args, **kwargs))', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 139, in _add_should_use_warning\n wrapped = TFShouldUseWarningWrapper(x)', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 96, in __init__\n stack = [s.strip() for s in traceback.format_stack()]']
==================================
2017-07-23 16:28:28.646871: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
2017-07-23 16:28:38.647276: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
2017-07-23 16:28:48.647526: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:

我在这里有2个问题
  • 如何解决有关Grpc的此错误
  • 我已经使用Manager()设置了一个多处理队列“结果”,并在设置过程时将其传递给两个工作人员。我希望一旦达到条件,每个进程就会将其Job ID写入队列,但是似乎队列始终包含最后完成的进程。这是否可能意味着队列中的某个位置被另一个进程
  • 覆盖

    [{'worker': 0}, {'worker': 0}]



    我可以使用多处理队列在tensorflow上两个不同进程上运行的两个 session 之间共享字典吗?

    下面是我的代码
    # build a python mutliprocess.py
    import multiprocessing
    import time
    import tensorflow as tf
    from tensorflow.contrib.training import HParams
    import os
    import psutil
    import numpy as np
    from tensorflow.python.client import device_lib
    from resnet import *
    import Queue

    cluster_spec ={"ps": ["localhost:2226"
    ],
    "worker": [
    "localhost:2227",
    "localhost:2228"]}

    cluster = tf.train.ClusterSpec(cluster_spec)
    im_Test = np.linspace(1,10,10)

    def model_fun(input):
    print multiprocessing.current_process().name
    return input

    def cifar10(device,return_dict,result_t):
    params = HParams(cluster=cluster,
    job_name = device[0],
    task_index = device[1])

    serv = tf.train.Server(cluster, job_name= params.job_name,task_index=params.task_index)
    input_img=[]
    true_lab=[]

    if params.job_name == "ps":
    ##try and wait for all the wokers t
    serv.join()
    elif params.job_name == "worker":
    with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/replica:0/task:%d" % params.task_index,
    cluster=cluster)):
    # with tf.Graph().as_default(), tf.device('/cpu:%d' % params.task_index):
    # with tf.container('%s %d' % ('batchname', params.task_index)) as scope:
    input_img = tf.placeholder(dtype=tf.float32, shape=[10,])
    with tf.name_scope('%s_%d' % (params.job_name, params.task_index)) as scope:
    hess_op = model_fun(input_img)
    global_step = tf.contrib.framework.get_or_create_global_step()
    sv = tf.train.Supervisor(is_chief=(params.task_index == 0),
    global_step=global_step,
    init_op=tf.initialize_all_variables(),logdir='/tmp/mydir')
    with sv.prepare_or_wait_for_session(serv.target) as sess:
    step = 0
    while not sv.should_stop() :
    hess = sess.run(hess_op, feed_dict={input_img:im_Test })
    print(np.array(hess))
    print multiprocessing.current_process().name
    step += 1
    if(step==3):
    return_dict[params.job_name] = params.task_index
    result_t.put(return_dict)
    break
    sv.stop()
    sess.close()


    return

    if __name__ == '__main__':

    logger = multiprocessing.log_to_stderr()
    manager = multiprocessing.Manager()
    result = manager.Queue()
    return_dict = manager.dict()
    processes = []
    devices = [['ps', 0],
    ['worker', 0],
    ['worker', 1]
    ]

    for i in (devices):
    start_time = time.time()
    proc = multiprocessing.Process(target=cifar10,args=(i,return_dict,result))
    processes.append(proc)
    proc.start()

    for p in processes:
    p.join()

    # print return_dict.values()
    kill = []
    while True:
    if result.empty() == True:
    break
    kill.append(result.get())
    print kill


    print("time taken = %d" % (start_time - time.time()))

    最佳答案

    在我的情况下,我发现ps会引发此错误,并且在提交tensorflowonspark工作 yarn 簇模式时,woker等待响应。
    ps错误如下

    2018-01-17 11:08:46,366 INFO (MainThread-7305) Starting TensorFlow ps:0 on cluster node 0 on background process 2018-01-17 11:08:56,085 INFO (MainThread-7395) 0: ======== ps:0 ======== 2018-01-17 11:08:56,086 INFO (MainThread-7395) 0: Cluster spec: {'ps': ['172.16.5.30:33088'], 'worker': ['172.16.5.22:41428', '172.16.5.30:33595']} 2018-01-17 11:08:56,086 INFO (MainThread-7395) 0: Using CPU 2018-01-17 11:08:56.087452: I tensorflow/core/platform/cpu_feature_guard.cc:137] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.1 SSE4.2 AVX AVX2 FMA E0117 11:08:56.088501182 7395 ev_epoll1_linux.c:1051] grpc epoll fd: 10 E0117 11:08:56.088860707 7395 server_chttp2.c:38] {"created":"@1516158536.088783549","description":"No address added out of total 1 resolved","file":"external/grpc/src/core/ext/transport/chttp2/server/chttp2_server.c","file_line":245,"referenced_errors":[{"created":"@1516158536.088779164","description":"Failed to add any wildcard listeners","file":"external/grpc/src/core/lib/iomgr/tcp_server_posix.c","file_line":338,"referenced_errors":[{"created":"@1516158536.088771177","description":"Unable to configure socket","fd":12,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":200,"referenced_errors":[{"created":"@1516158536.088767669","description":"OS Error","errno":98,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":173,"os_error":"Address already in use","syscall":"bind"}]},{"created":"@1516158536.088778651","description":"Unable to configure socket","fd":12,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":200,"referenced_errors":[{"created":"@1516158536.088776541","description":"OS Error","errno":98,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":173,"os_error":"Address already in use","syscall":"bind"}]}]}]} Process Process-2: Traceback (most recent call last): File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000001/tfspark.zip/tensorflowonspark/TFSparkNode.py", line 269, in wrapper_fn File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/pyfiles/mnist_dist.py", line 38, in map_fun cluster, server = ctx.start_cluster_server(1, args.rdma) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/tfspark.zip/tensorflowonspark/TFSparkNode.py", line 56, in start_cluster_server return TFNode.start_cluster_server(self, num_gpus, rdma) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/tfspark.zip/tensorflowonspark/TFNode.py", line 110, in start_cluster_server server = tf.train.Server(cluster, ctx.job_name, ctx.task_index) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/site-packages/tensorflow/python/training/server_lib.py", line 145, in init self._server_def.SerializeToString(), status) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 473, in exit c_api.TF_GetCode(self.status.status)) UnknownError: Could not start gRPC server


    woker:1 log

    2018-01-17 11:09:14.614244: I tensorflow/core/distributed_runtime/master.cc:221] CreateSession still waiting for response from worker: /job:ps/replica:0/task:0



    然后,我检查ps服务器中的端口。是的,该端口已被使用。

    因此,重新提交工作即可解决问题。

    但是,如果您每次运行代码都收到此错误,则应检查更多日志以查找原因。

    关于python - Tensorflow多处理; UnknownError : Could not start gRPC server,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45269790/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com