gpt4 book ai didi

Python多线程+多处理BrokenPipeError(子进程未退出?)

转载 作者:行者123 更新时间:2023-11-30 23:19:13 25 4
gpt4 key购买 nike

当使用 multiprocessing.JoinableQueue 生成进程的线程时,我收到 BrokenPipeError 。似乎是在程序完成工作并尝试退出之后发生的,因为它做了它应该做的所有事情。这是什么意思,有没有办法解决这个问题/安全忽略?

import requests
import multiprocessing
from multiprocessing import JoinableQueue
from queue import Queue
import threading


class ProcessClass(multiprocessing.Process):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.in_queue = in_queue
self.out_queue = out_queue
self.func = func

def run(self):
while True:
arg = self.in_queue.get()
self.func(arg, self.out_queue)
self.in_queue.task_done()


class ThreadClass(threading.Thread):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.in_queue = in_queue
self.out_queue = out_queue
self.func = func

def run(self):
while True:
arg = self.in_queue.get()
self.func(arg, self.out_queue)
self.in_queue.task_done()


def get_urls(host, out_queue):
r = requests.get(host)
out_queue.put(r.text)
print(r.status_code, host)


def get_title(text, out_queue):
print(text.strip('\r\n ')[:5])


if __name__ == '__main__':
def test():

q1 = JoinableQueue()
q2 = JoinableQueue()

for i in range(2):
t = ThreadClass(get_urls, q1, q2)
t.daemon = True
t.setDaemon(True)
t.start()

for i in range(2):
t = ProcessClass(get_title, q2, None)
t.daemon = True
t.start()

for host in ("http://ibm.com", "http://yahoo.com", "http://google.com", "http://amazon.com", "http://apple.com",):
q1.put(host)

q1.join()
q2.join()

test()
print('Finished')

程序输出:

200 http://ibm.com
<!DOC
200 http://google.com
<!doc
200 http://yahoo.com
<!DOC
200 http://apple.com
<!DOC
200 http://amazon.com
<!DOC
Finished
Exception in thread Thread-2:
Traceback (most recent call last):
File "C:\Python\33\lib\multiprocessing\connection.py", line 313, in _recv_bytes
nread, err = ov.GetOverlappedResult(True)
BrokenPipeError: [WinError 109]

The pipe has been ended

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "C:\Python\33\lib\threading.py", line 901, in _bootstrap_inner
self.run()
File "D:\Progs\Uspat\uspat\spider\run\threads_test.py", line 31, in run
arg = self.in_queue.get()
File "C:\Python\33\lib\multiprocessing\queues.py", line 94, in get
res = self._recv()
File "C:\Python\33\lib\multiprocessing\connection.py", line 251, in recv
buf = self._recv_bytes()
File "C:\Python\33\lib\multiprocessing\connection.py", line 322, in _recv_bytes
raise EOFError
EOFError
....

(为其他线程消除相同的错误。)

如果我将 JoinableQueue 切换为多线程部分的queue.Queue,一切都会修复,但为什么?

最佳答案

发生这种情况是因为您让后台线程阻塞在 multiprocessing.Queue.get 中。主线程退出时调用,但仅在某些条件下发生:

  1. 守护线程正在 multiprocessing.Queue.get 上运行并阻塞。当主线程退出时。
  2. 一个multiprocessing.Process正在运行。
  3. multiprocessing上下文不是'fork' .

异常(exception)情况是告诉您 Connection 的另一端那multiprocessing.JoinableQueueget() 内部时正在聆听来电已发送 EOF 。一般来说,这意味着 Connection 的另一面已关闭。这是有道理的,这种情况发生在关闭期间 - Python 在退出解释器之前清理所有对象,并且清理的一部分涉及关闭所有打开的 Connection对象。我还没弄清楚为什么它只(并且总是)发生在 multiprocessing.Process 的情况下。已生成( fork ,这就是默认情况下在 Linux 上不会发生的原因)并且仍在运行。如果我创建 multiprocessing.Process 我什至可以重现它它只是睡在 while 中环形。不需要任何 Queue根本没有对象。无论出于何种原因,正在运行的、生成的子进程的存在似乎保证会引发异常。它可能只是导致事物被破坏的顺序正好适合竞争条件的发生,但这只是一个猜测。

无论如何,使用 queue.Queue而不是multiprocessing.JoinableQueue是修复它的好方法,因为您实际上并不需要 multiprocessing.Queue那里。您还可以通过将哨兵发送到其队列来确保后台线程和/或后台进程在主线程之前关闭。因此,将两者都设为 run方法检查哨兵:

def run(self):
for arg in iter(self.in_queue.get, None): # None is the sentinel
self.func(arg, self.out_queue)
self.in_queue.task_done()
self.in_queue.task_done()

完成后发送哨兵:

    threads = []
for i in range(2):
t = ThreadClass(get_urls, q1, q2)
t.daemon = True
t.setDaemon(True)
t.start()
threads.append(t)

p = multiprocessing.Process(target=blah)
p.daemon = True
p.start()
procs = []
for i in range(2):
t = ProcessClass(get_title, q2, None)
t.daemon = True
t.start()
procs.append(t)

for host in ("http://ibm.com", "http://yahoo.com", "http://google.com", "http://amazon.com", "http://apple.com",):
q1.put(host)

q1.join()
# All items have been consumed from input queue, lets start shutting down.
for t in procs:
q2.put(None)
t.join()
for t in threads:
q1.put(None)
t.join()
q2.join()

关于Python多线程+多处理BrokenPipeError(子进程未退出?),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26188677/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com