gpt4 book ai didi

python - 加入多处理队列需要很长时间

转载 作者:行者123 更新时间:2023-12-05 00:16:15 26 4
gpt4 key购买 nike

在 Python 2.7 中,我实现了一个具有多个队列和消费者的多处理场景。简化的想法是,我有一个作业的生产者,这些作业被提供给消费者,处理作业和一个错误处理程序,它负责所有的日志记录。非常简化,看起来都可以与之媲美:

import multiprocessing as mp
import Queue

job_queue = mp.Queue()
error_queue = mp.Queue()
for i in range(10):
job_queue.put(i)

def job_handler(job_queue, error_queue):
print 'Job handler'
while True:
try:
element = job_queue.get_nowait()
print element
except:
# t1
error_queue.put('Error')
error_queue.close()
error_queue.join_thread()
job_queue.close()
job_queue.join_thread()
# t2
return 1

def error_handler(error_queue):
result = error_queue.get()
if result == 'Error':
error_queue.close()
error_queue.join_thread()

if __name__ == '__main__':
print 'Starting'
p1 = mp.Process(target = error_handler, args = (error_queue, ))
p1.start()
p2 = mp.Process(target = job_handler, args = (job_queue, error_queue))
p2.start()

这基本有效,但是在我比较复杂的程序中,两个注释点之间存在很长的时间差 t1t2 (约 5 分钟)。所以我有两个问题:
  • 我理解正确吗,每个进程都应该调用 close()join_thread()在所有使用过的 Queue 对象上,以表明它已使用它们?我认为,当我结束它们时,子进程会隐含地这样做,例如通过返回 here :

  • join_thread() Join the background thread. This can only be used after close() has been called. It blocks until the background thread exits, ensuring that all data in the buffer has been flushed to the pipe.

    By default if a process is not the creator of the queue then on exit it will attempt to join the queue’s background thread. The process can call cancel_join_thread() to make join_thread() do nothing.


  • 我怎样才能弄清楚为什么加入过程需要这么长时间?
  • 最佳答案

    在文档中找到以下内容:
    docs.python.org

    From the docs:
    Joining processes that use queues

    Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread method of the queue to avoid this behaviour.)

    This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.



    正如我所理解的,一个进程,这里 p2=jobHandler,在将项目放入队列后不应立即退出,以避免丢失排队数据。
    找不到这句话的任何解释 否则你不能……将项目放入队列将终止。

    除了上面的,我想评论你的代码。我认识到这段代码被简化了。
  • 避免将在启动时执行的代码放在 if __name__ == '__main__': 之外

    From the docs: Safe importing of main module One should protect the “entry point” of the program by using if name == 'main':


    job_queue = mp.Queue()
    error_queue = mp.Queue()
    for i in range(10):
    job_queue.put(i)
  • .close()在定义 job_handler/error_handler
    except:
    ...
    job_queue.close()

    这是错误的,因为 job_handler 进程永远不会将消息放在这个队列上。
    这也适用于进程 error_handler 和 error_queue.close()

  • From the docs:
    Indicate that no more data will be put on this queue by the current process.
    The background thread will quit once it has flushed all buffered data to the pipe.
    This is called automatically when the queue is garbage collected.


  • .join_thread()在定义 job_handler/error_handler
    这是无用的,因为 job_handler 进程不会将消息放入此队列。因此 .join_thread 没什么 .
    进程error_handler 也是如此。
    except:

    ...

    job_queue.join_thread()
    # t2def error_handler(error_queue):
    ...
    error_queue.close()
    error_queue.join_thread()
  • 使用 Exit(1)而不是 return 1p2.exitcode. 无法捕获错误代码“1”
    更多地将过程视为自己的程序而不是函数。
    return 1

  • 请尝试以下操作:
    # t1
    error_queue.put('Error')
    error_queue.close()

    # Give the error_handler a chance to get a timeslice
    time.sleep(0.2)

    error_queue.join_thread()
    #job_queue.close()
    #job_queue.join_thread()
    # t2
    exit(1)

    用 Python:3.4.2 和 Python:2.7.9 测试

    关于python - 加入多处理队列需要很长时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42350933/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com