- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我第一次涉足 python mutliprocessing 模块,但遇到了一些问题。我非常熟悉线程模块,但我需要确保我正在执行的进程是并行运行的。
这是我正在尝试做的事情的概要。请忽略未声明的变量/函数之类的东西,因为我无法完整粘贴我的代码。
import multiprocessing
import time
def wrap_func_to_run(host, args, output):
output.append(do_something(host, args))
return
def func_to_run(host, args):
return do_something(host, args)
def do_work(server, client, server_args, client_args):
server_output = func_to_run(server, server_args)
client_output = func_to_run(client, client_args)
#handle this output and return a result
return result
def run_server_client(server, client, server_args, client_args, server_output, client_output):
server_process = multiprocessing.Process(target=wrap_func_to_run, args=(server, server_args, server_output))
server_process.start()
client_process = multiprocessing.Process(target=wrap_func_to_run, args=(client, client_args, client_output))
client_process.start()
server_process.join()
client_process.join()
#handle the output and return some result
def run_in_parallel(server, client):
#set up commands for first process
server_output = client_output = []
server_cmd = "cmd"
client_cmd = "cmd"
process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, server_output, client_output))
process_one.start()
#set up second process to run - but this one can run here
result = do_work(server, client, "some server args", "some client args")
process_one.join()
#use outputs above and the result to determine result
return final_result
def main():
#grab client
client = client()
#grab server
server = server()
return run_in_parallel(server, client)
if __name__ == "__main__":
main()
这是我遇到的错误:
Error in sys.exitfunc:
Traceback (most recent call last):
File "/usr/lib64/python2.7/atexit.py", line 24, in _run_exitfuncs
func(*targs, **kargs)
File "/usr/lib64/python2.7/multiprocessing/util.py", line 319, in _exit_function
p.join()
File "/usr/lib64/python2.7/multiprocessing/process.py", line 143, in join
assert self._parent_pid == os.getpid(), 'can only join a child process'
AssertionError: can only join a child process
我尝试了很多不同的方法来解决这个问题,但我觉得我使用这个模块的方式有问题。
编辑:
所以我创建了一个文件,该文件将通过模拟客户端/服务器及其所做的工作来重现这一点——另外我错过了一个重要的点,那就是我在 unix 中运行它。另一个重要信息是,在我的实际案例中,do_work
涉及使用 os.fork()
。我无法在不使用 os.fork()
的情况下重现错误,所以我假设问题就在那里。在我的真实案例中,那部分代码不是我的,所以我把它当作一个黑盒子(可能是我的错误)。无论如何,这是要重现的代码 -
#!/usr/bin/python
import multiprocessing
import time
import os
import signal
import sys
class Host():
def __init__(self):
self.name = "host"
def work(self):
#override - use to simulate work
pass
class Server(Host):
def __init__(self):
self.name = "server"
def work(self):
x = 0
for i in range(10000):
x+=1
print x
time.sleep(1)
class Client(Host):
def __init__(self):
self.name = "client"
def work(self):
x = 0
for i in range(5000):
x+=1
print x
time.sleep(1)
def func_to_run(host, args):
print host.name + " is working"
host.work()
print host.name + ": " + args
return "done"
def do_work(server, client, server_args, client_args):
print "in do_work"
server_output = client_output = ""
child_pid = os.fork()
if child_pid == 0:
server_output = func_to_run(server, server_args)
sys.exit(server_output)
time.sleep(1)
client_output = func_to_run(client, client_args)
# kill and wait for server to finish
os.kill(child_pid, signal.SIGTERM)
(pid, status) = os.waitpid(child_pid, 0)
return (server_output == "done" and client_output =="done")
def run_server_client(server, client, server_args, client_args):
server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args))
print "Starting server process"
server_process.start()
client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args))
print "Starting client process"
client_process.start()
print "joining processes"
server_process.join()
client_process.join()
print "processes joined and done"
def run_in_parallel(server, client):
#set up commands for first process
server_cmd = "server command for run_server_client"
client_cmd = "client command for run_server_client"
process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd))
print "Starting process one"
process_one.start()
#set up second process to run - but this one can run here
print "About to do work"
result = do_work(server, client, "server args from do work", "client args from do work")
print "Joining process one"
process_one.join()
#use outputs above and the result to determine result
print "Process one has joined"
return result
def main():
#grab client
client = Client()
#grab server
server = Server()
return run_in_parallel(server, client)
if __name__ == "__main__":
main()
如果我在 do_work
中删除对 os.fork()
的使用,我不会收到错误并且代码的行为与我之前预期的一样(除了对于我接受为我的错误/误解的输出的传递)。我可以将旧代码更改为不使用 os.fork(),但我也想知道为什么会导致此问题,以及是否有可行的解决方案。
编辑 2:
我开始研究一个在接受的答案之前省略 os.fork() 的解决方案。这是我对可以完成的模拟工作量进行一些调整后得到的 -
#!/usr/bin/python
import multiprocessing
import time
import os
import signal
import sys
from Queue import Empty
class Host():
def __init__(self):
self.name = "host"
def work(self, w):
#override - use to simulate work
pass
class Server(Host):
def __init__(self):
self.name = "server"
def work(self, w):
x = 0
for i in range(w):
x+=1
print x
time.sleep(1)
class Client(Host):
def __init__(self):
self.name = "client"
def work(self, w):
x = 0
for i in range(w):
x+=1
print x
time.sleep(1)
def func_to_run(host, args, w, q):
print host.name + " is working"
host.work(w)
print host.name + ": " + args
q.put("ZERO")
return "done"
def handle_queue(queue):
done = False
results = []
return_val = 0
while not done:
#try to grab item from Queue
tr = None
try:
tr = queue.get_nowait()
print "found element in queue"
print tr
except Empty:
done = True
if tr is not None:
results.append(tr)
for el in results:
if el != "ZERO":
return_val = 1
return return_val
def do_work(server, client, server_args, client_args):
print "in do_work"
server_output = client_output = ""
child_pid = os.fork()
if child_pid == 0:
server_output = func_to_run(server, server_args)
sys.exit(server_output)
time.sleep(1)
client_output = func_to_run(client, client_args)
# kill and wait for server to finish
os.kill(child_pid, signal.SIGTERM)
(pid, status) = os.waitpid(child_pid, 0)
return (server_output == "done" and client_output =="done")
def run_server_client(server, client, server_args, client_args, w, mq):
local_queue = multiprocessing.Queue()
server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args, w, local_queue))
print "Starting server process"
server_process.start()
client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args, w, local_queue))
print "Starting client process"
client_process.start()
print "joining processes"
server_process.join()
client_process.join()
print "processes joined and done"
if handle_queue(local_queue) == 0:
mq.put("ZERO")
def run_in_parallel(server, client):
#set up commands for first process
master_queue = multiprocessing.Queue()
server_cmd = "server command for run_server_client"
client_cmd = "client command for run_server_client"
process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, 400000000, master_queue))
print "Starting process one"
process_one.start()
#set up second process to run - but this one can run here
print "About to do work"
#result = do_work(server, client, "server args from do work", "client args from do work")
run_server_client(server, client, "server args from do work", "client args from do work", 5000, master_queue)
print "Joining process one"
process_one.join()
#use outputs above and the result to determine result
print "Process one has joined"
return_val = handle_queue(master_queue)
print return_val
return return_val
def main():
#grab client
client = Client()
#grab server
server = Server()
val = run_in_parallel(server, client)
if val:
print "failed"
else:
print "passed"
return val
if __name__ == "__main__":
main()
这段代码有一些调整的打印输出,只是为了准确地查看发生了什么。我使用 multiprocessing.Queue 跨进程存储和共享输出,并返回到我的主线程中进行处理。我认为这解决了我的问题的 python 部分,但我正在处理的代码中仍然存在一些问题。我唯一可以说的是,相当于 func_to_run
涉及通过 ssh 发送命令并获取任何错误以及输出。出于某种原因,这对于执行时间短的命令非常有效,但对于执行时间/输出更长的命令来说效果不佳。我尝试在此处的代码中使用截然不同的工作值对此进行模拟,但无法重现类似的结果。
编辑 3我正在使用的库代码(同样不是我的)使用 Popen.wait()
作为 ssh 命令,我刚刚读到这个:
Popen.wait()
Wait for child process to terminate. Set and return returncode attribute.Warning This will deadlock when using stdout=PIPE and/or stderr=PIPE and the >child process generates enough output to a pipe such that it blocks waiting for >the OS pipe buffer to accept more data. Use communicate() to avoid that.
我将代码调整为不缓冲,只在收到时打印,一切正常。
最佳答案
I can change the old code to not use
os.fork()
but I'd also like to know why this caused this problem and if there's a workable solution.
理解问题的关键是确切地知道是什么 fork()
做。 CPython 文档声明“Fork a child process”。但这假定您了解 C 库调用 fork()
.
glibc 的联机帮助页是这样说的:
fork()
creates a new process by duplicating the calling process. The new process, referred to as the child, is an exact duplicate of the calling process, referred to as the parent, except for the following points: ...
这基本上就好像您获取了您的程序并复制了它的程序状态(堆、堆栈、指令指针等),但差异很小,并让它独立于原始程序执行。当这个子进程自然退出时,会使用exit()
这将触发 atexit()
multiprocessing
注册的处理程序模块。
你能做些什么来避免它?
os.fork()
: 使用 multiprocessing
相反,就像你现在正在探索一样import multiprocessing
在执行 fork()
之后,仅在必要时在 child 或 parent 中。_exit()
在子进程中(CPython 文档状态,“注意退出的标准方法是 sys.exit(n)。_exit() 通常只应在 fork() 之后的子进程中使用。”)关于Python 多处理 - AssertionError : can only join a child process,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37692262/
我正在测试设置SQLAlchemy以映射现有数据库。这个数据库是很久以前自动建立的,它是由我们不再使用的先前的第三方应用程序创建的,因此 undefined 某些预期的事情,例如外键约束。该软件将管理
这个问题在这里已经有了答案: What is the difference between "INNER JOIN" and "OUTER JOIN"? (28 个答案) 关闭 7 年前。 INNE
这个问题在这里已经有了答案: What is the difference between "INNER JOIN" and "OUTER JOIN"? (29 个回答) 关闭7年前. INNER J
假设有两个表: table1.c1 table1.c2 1 1 A 2 1 B 3 1 C 4 2
假设有两个表: table1.c1 table1.c2 1 1 A 2 1 B 3 1 C 4 2
一.先看一些最简单的例子 例子 Table A aid adate 1 a1 2&nb
数据库操作语句 7. 外连接——交叉查询 7.1 查询 7.2 等值连接 7.3 右外
我有两个表 'users' 和 'lms_users' class LmsUser belongs_to :user end class User has_one :lms_user
我试图避免在 Rails 中对我的 joins 进行字符串插值,因为我注意到将查询器链接在一起时灵活性会降低。 也就是说,我觉得 joins(:table1) 比 joins('inner join
我有这个代码 User.find(:all, :limit => 10, :joins => :user_points, :select => "users.*, co
我刚刚开始探索 Symfony2,我很惊讶它拥有如此多的强大功能。我开始做博客教程在: http://tutorial.symblog.co.uk/ 但使用的是 2.1 版而不是 2.0 我的问题是我
什么是 SQL JOIN什么是不同的类型? 最佳答案 插图来自 W3schools : 关于SQL JOIN 和不同类型的 JOIN,我们在Stack Overflow上找到一个类似的问题: http
我有两个 Hive 表,我正在尝试加入它们。这些表没有被任何字段聚集或分区。尽管表包含公共(public)键字段的记录,但连接查询始终返回 0 条记录。所有数据类型都是“字符串”数据类型。 连接查询很
我正在使用 Solr 的(4.0.0-beta)连接功能来查询包含具有父/子关系的文档的索引。连接查询效果很好,但我只能在搜索结果中获得父文档。我相信这是预期的行为。 但是,是否有可能在搜索结果中同时
我正在使用可用的指南/api/书籍自学 Rails,但我无法理解通过三种方式/嵌套 has_many :through 关联进行的连接。 我有用户与组相关联:通过成员(member)资格。 我在多对多
什么是 SQL JOIN,有哪些不同的类型? 最佳答案 插图来自 W3schools : 关于SQL JOIN 和不同类型的 JOIN,我们在Stack Overflow上找到一个类似的问题: htt
我正在尝试访问数据库的两个表。在商店里,我保留了一个事件列表,其中包含 Table Event id, name,datei,houri, dateF,Hourf ,capacity, age ,de
我有 4 个表:booking、address、search_address 和 search_address_log 表:(相关列) 预订:(pickup_address_id, dropoff_a
我在YML中有以下结构:。我正试着创造一个这样的结构:。作业名称和脚本用~分隔,作业用;分隔。。我可以使用以下命令使其正常工作。然而,我想知道是否可以用一个yq表达式来完成,而不是通过管道再次使用yq
我在YML中有以下结构:。我正试着创造一个这样的结构:。作业名称和脚本用~分隔,作业用;分隔。。我可以使用以下命令使其正常工作。然而,我想知道是否可以用一个yq表达式来完成,而不是通过管道再次使用yq
我是一名优秀的程序员,十分优秀!