gpt4 book ai didi

Python 多处理 - AssertionError : can only join a child process

转载 作者:太空狗 更新时间:2023-10-29 21:08:16 25 4
gpt4 key购买 nike

我第一次涉足 python mutliprocessing 模块,但遇到了一些问题。我非常熟悉线程模块,但我需要确保我正在执行的进程是并行运行的。

这是我正在尝试做的事情的概要。请忽略未声明的变量/函数之类的东西,因为我无法完整粘贴我的代码。

import multiprocessing
import time

def wrap_func_to_run(host, args, output):
output.append(do_something(host, args))
return

def func_to_run(host, args):
return do_something(host, args)

def do_work(server, client, server_args, client_args):
server_output = func_to_run(server, server_args)
client_output = func_to_run(client, client_args)
#handle this output and return a result
return result

def run_server_client(server, client, server_args, client_args, server_output, client_output):
server_process = multiprocessing.Process(target=wrap_func_to_run, args=(server, server_args, server_output))
server_process.start()
client_process = multiprocessing.Process(target=wrap_func_to_run, args=(client, client_args, client_output))
client_process.start()
server_process.join()
client_process.join()
#handle the output and return some result

def run_in_parallel(server, client):
#set up commands for first process
server_output = client_output = []
server_cmd = "cmd"
client_cmd = "cmd"
process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, server_output, client_output))
process_one.start()
#set up second process to run - but this one can run here
result = do_work(server, client, "some server args", "some client args")
process_one.join()
#use outputs above and the result to determine result
return final_result

def main():
#grab client
client = client()
#grab server
server = server()
return run_in_parallel(server, client)

if __name__ == "__main__":
main()

这是我遇到的错误:

Error in sys.exitfunc:
Traceback (most recent call last):
File "/usr/lib64/python2.7/atexit.py", line 24, in _run_exitfuncs
func(*targs, **kargs)
File "/usr/lib64/python2.7/multiprocessing/util.py", line 319, in _exit_function
p.join()
File "/usr/lib64/python2.7/multiprocessing/process.py", line 143, in join
assert self._parent_pid == os.getpid(), 'can only join a child process'
AssertionError: can only join a child process

我尝试了很多不同的方法来解决这个问题,但我觉得我使用这个模块的方式有问题。

编辑:

所以我创建了一个文件,该文件将通过模拟客户端/服务器及其所做的工作来重现这一点——另外我错过了一个重要的点,那就是我在 unix 中运行它。另一个重要信息是,在我的实际案例中,do_work 涉及使用 os.fork()。我无法在不使用 os.fork() 的情况下重现错误,所以我假设问题就在那里。在我的真实案例中,那部分代码不是我的,所以我把它当作一个黑盒子(可能是我的错误)。无论如何,这是要重现的代码 -

#!/usr/bin/python

import multiprocessing
import time
import os
import signal
import sys

class Host():
def __init__(self):
self.name = "host"

def work(self):
#override - use to simulate work
pass

class Server(Host):
def __init__(self):
self.name = "server"

def work(self):
x = 0
for i in range(10000):
x+=1
print x
time.sleep(1)

class Client(Host):
def __init__(self):
self.name = "client"

def work(self):
x = 0
for i in range(5000):
x+=1
print x
time.sleep(1)

def func_to_run(host, args):
print host.name + " is working"
host.work()
print host.name + ": " + args
return "done"

def do_work(server, client, server_args, client_args):
print "in do_work"
server_output = client_output = ""
child_pid = os.fork()
if child_pid == 0:
server_output = func_to_run(server, server_args)
sys.exit(server_output)
time.sleep(1)

client_output = func_to_run(client, client_args)
# kill and wait for server to finish
os.kill(child_pid, signal.SIGTERM)
(pid, status) = os.waitpid(child_pid, 0)

return (server_output == "done" and client_output =="done")

def run_server_client(server, client, server_args, client_args):
server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args))
print "Starting server process"
server_process.start()
client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args))
print "Starting client process"
client_process.start()
print "joining processes"
server_process.join()
client_process.join()
print "processes joined and done"

def run_in_parallel(server, client):
#set up commands for first process
server_cmd = "server command for run_server_client"
client_cmd = "client command for run_server_client"
process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd))
print "Starting process one"
process_one.start()
#set up second process to run - but this one can run here
print "About to do work"
result = do_work(server, client, "server args from do work", "client args from do work")
print "Joining process one"
process_one.join()
#use outputs above and the result to determine result
print "Process one has joined"
return result

def main():
#grab client
client = Client()
#grab server
server = Server()
return run_in_parallel(server, client)

if __name__ == "__main__":
main()

如果我在 do_work 中删除对 os.fork() 的使用,我不会收到错误并且代码的行为与我之前预期的一样(除了对于我接受为我的错误/误解的输出的传递)。我可以将旧代码更改为不使用 os.fork(),但我也想知道为什么会导致此问题,以及是否有可行的解决方案。

编辑 2:

我开始研究一个在接受的答案之前省略 os.fork() 的解决方案。这是我对可以完成的模拟工作量进行一些调整后得到的 -

#!/usr/bin/python

import multiprocessing
import time
import os
import signal
import sys
from Queue import Empty

class Host():
def __init__(self):
self.name = "host"

def work(self, w):
#override - use to simulate work
pass

class Server(Host):
def __init__(self):
self.name = "server"

def work(self, w):
x = 0
for i in range(w):
x+=1
print x
time.sleep(1)

class Client(Host):
def __init__(self):
self.name = "client"

def work(self, w):
x = 0
for i in range(w):
x+=1
print x
time.sleep(1)

def func_to_run(host, args, w, q):
print host.name + " is working"
host.work(w)
print host.name + ": " + args
q.put("ZERO")
return "done"

def handle_queue(queue):
done = False
results = []
return_val = 0
while not done:
#try to grab item from Queue
tr = None
try:
tr = queue.get_nowait()
print "found element in queue"
print tr
except Empty:
done = True
if tr is not None:
results.append(tr)
for el in results:
if el != "ZERO":
return_val = 1
return return_val

def do_work(server, client, server_args, client_args):
print "in do_work"
server_output = client_output = ""
child_pid = os.fork()
if child_pid == 0:
server_output = func_to_run(server, server_args)
sys.exit(server_output)
time.sleep(1)

client_output = func_to_run(client, client_args)
# kill and wait for server to finish
os.kill(child_pid, signal.SIGTERM)
(pid, status) = os.waitpid(child_pid, 0)

return (server_output == "done" and client_output =="done")



def run_server_client(server, client, server_args, client_args, w, mq):
local_queue = multiprocessing.Queue()
server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args, w, local_queue))
print "Starting server process"
server_process.start()
client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args, w, local_queue))
print "Starting client process"
client_process.start()
print "joining processes"
server_process.join()
client_process.join()
print "processes joined and done"
if handle_queue(local_queue) == 0:
mq.put("ZERO")

def run_in_parallel(server, client):
#set up commands for first process
master_queue = multiprocessing.Queue()
server_cmd = "server command for run_server_client"
client_cmd = "client command for run_server_client"
process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, 400000000, master_queue))
print "Starting process one"
process_one.start()
#set up second process to run - but this one can run here
print "About to do work"
#result = do_work(server, client, "server args from do work", "client args from do work")
run_server_client(server, client, "server args from do work", "client args from do work", 5000, master_queue)
print "Joining process one"
process_one.join()
#use outputs above and the result to determine result
print "Process one has joined"
return_val = handle_queue(master_queue)
print return_val
return return_val

def main():
#grab client
client = Client()
#grab server
server = Server()
val = run_in_parallel(server, client)
if val:
print "failed"
else:
print "passed"
return val

if __name__ == "__main__":
main()

这段代码有一些调整的打印输出,只是为了准确地查看发生了什么。我使用 multiprocessing.Queue 跨进程存储和共享输出,并返回到我的主线程中进行处理。我认为这解决了我的问题的 python 部分,但我正在处理的代码中仍然存在一些问题。我唯一可以说的是,相当于 func_to_run 涉及通过 ssh 发送命令并获取任何错误以及输出。出于某种原因,这对于执行时间短的命令非常有效,但对于执行时间/输出更长的命令来说效果不佳。我尝试在此处的代码中使用截然不同的工作值对此进行模拟,但无法重现类似的结果。

编辑 3我正在使用的库代码(同样不是我的)使用 Popen.wait() 作为 ssh 命令,我刚刚读到这个:

Popen.wait() Wait for child process to terminate. Set and return returncode attribute.

Warning This will deadlock when using stdout=PIPE and/or stderr=PIPE and the >child process generates enough output to a pipe such that it blocks waiting for >the OS pipe buffer to accept more data. Use communicate() to avoid that.

我将代码调整为不缓冲,只在收到时打印,一切正常。

最佳答案

I can change the old code to not use os.fork() but I'd also like to know why this caused this problem and if there's a workable solution.

理解问题的关键是确切地知道是什么 fork()做。 CPython 文档声明“Fork a child process”。但这假定您了解 C 库调用 fork() .

glibc 的联机帮助页是这样说的:

fork() creates a new process by duplicating the calling process. The new process, referred to as the child, is an exact duplicate of the calling process, referred to as the parent, except for the following points: ...

这基本上就好像您获取了您的程序并复制了它的程序状态(堆、堆栈、指令指针等),但差异很小,并让它独立于原始程序执行。当这个子进程自然退出时,会使用exit()这将触发 atexit() multiprocessing 注册的处理程序模块。

你能做些什么来避免它?

  • 省略 os.fork() : 使用 multiprocessing相反,就像你现在正在探索一样
  • 可能有效:import multiprocessing在执行 fork() 之后,仅在必要时在 child 或 parent 中。
  • 使用 _exit() 在子进程中(CPython 文档状态,“注意退出的标准方法是 sys.exit(n)。_exit() 通常只应在 fork() 之后的子进程中使用。”)

https://docs.python.org/2/library/os.html#os._exit

关于Python 多处理 - AssertionError : can only join a child process,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37692262/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com