gpt4 book ai didi

python - 在 Python 中,如何等待多个队列中的所有项目都被处理?

转载 作者:行者123 更新时间:2023-11-28 18:41:12 24 4
gpt4 key购买 nike

在下面的代码中,我有两个队列用于运行不同类型的线程。这些线程递归地添加到彼此的队列中(队列 1 获取一些信息,队列 2 处理它并将更多信息添加到队列 1)。

我想等到两个队列中的所有项目都处理完毕。目前我正在使用这段代码

queue.join()
out_queue.join()

问题是当第一个队列暂时用完了要做的事情时,它就关闭了,所以它再也看不到队列 2(out_queue)在那之后向它添加了什么。

我在 time.sleep() 函数中添加了一个非常糟糕的修复程序,到 30 秒时,两个队列都已填满,不会用完。

修复此问题的标准 Python 方法是什么?我是否必须只有一个队列,并在其中标记项目以说明它们应该由哪个线程处理?

queue = Queue.Queue()
out_queue = Queue.Queue()

class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, queue, out_queue):
threading.Thread.__init__(self)
self.queue = queue
self.out_queue = out_queue

def run(self):
while True:
row = self.queue.get()

request = urllib2.Request(row[0], None, req_headers)

# ... some processing ...

self.out_queue.put([row, http_status, page])

self.queue.task_done()

class DatamineThread(threading.Thread):
def __init__(self, out_queue, mysql):
threading.Thread.__init__(self)
self.out_queue = out_queue
self.mysql = mysql

def run(self):
while True:
row = self.out_queue.get()

# ... some processing ...

queue.put(newrow)

self.out_queue.task_done()

queue = Queue.Queue()
out_queue = Queue.Queue()

for i in range(URL_THREAD_COUNT):
t = ThreadUrl(queue, out_queue)
t.setDaemon(True)
t.start()

#populate queue with data
for row in rows:
queue.put(row)

#MySQL Connector
mysql = MySQLConn(host='localhost', user='root', passwd = None, db='db')

#spawn DatamineThread, if you have multiple, make sure each one has it's own mysql connector
dt = DatamineThread(out_queue, mysql)
dt.setDaemon(True)
dt.start()

time.sleep(30)

#wait on the queue until everything has been processed
queue.join()
out_queue.join()

最佳答案

假设两个队列分别命名为queue_1queue_2

  • 正确的解决方案:单独跟踪待处理作品的总数( with a lock ),然后等到值为零(使用条件变量)。

  • 正确的解决方案,但不推荐:使用未记录的 API/内部方法...

    while True:
    with queue_1.mutex, queue_2.mutex:
    if queue_1.unfinished_tasks==0 and queue_2.unfinished_tasks==0:
    break
    queue_1.join()
    queue_2.join()
  • 错误的解决方案:

    while not (queue_1.empty() and queue_2.empty()):
    queue_1.join()
    queue_2.join()

    这是不正确的,因为在 queue_2.join 和下一个 while 检查之后;并且有可能两个队列中都没有项目但任务尚未完成(正在处理一个元素)

    例如,在下面的代码中:

    #!/bin/python
    from threading import Thread
    from queue import Queue
    import time


    queue_1 = Queue()
    queue_2 = Queue()

    def debug(): print(queue_1.qsize(), queue_2.qsize())
    def run_debug():
    while True:
    time.sleep(0.2)
    debug()
    Thread(target=run_debug).start()

    def run_1():
    while True:
    value=queue_1.get()
    print("get value", value)
    time.sleep(1)
    if value:
    print("put value", value-1)
    queue_2.put(value-1)
    time.sleep(0.5)
    queue_1.task_done()


    def run_2():
    while True:
    value=queue_2.get()
    print("get value", value)
    time.sleep(1)
    if value:
    print("put value", value-1)
    queue_1.put(value-1)
    time.sleep(0.5)
    queue_2.task_done()


    thread_1 = Thread(target=run_1)
    thread_2 = Thread(target=run_2)
    thread_1.start()
    thread_2.start()

    queue_1.put(3)

    # wait for both queues
    while not (queue_1.empty() and queue_2.empty()):
    queue_1.join()
    queue_2.join()

    print("done")
    # (add code to stop the threads properly)

    输出是

    get value 3
    get value 2
    get value 1
    done
    get value 0

关于python - 在 Python 中,如何等待多个队列中的所有项目都被处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25877796/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com