gpt4 book ai didi

python - 由于任务已被新线程拉取而调用 task_done() 时发生死锁

转载 作者:行者123 更新时间:2023-12-01 04:45:33 25 4
gpt4 key购买 nike

我正在尝试使用 Python (3.4) 进行多线程处理,并对以下代码有疑问。

当我的工作量大于 NUM_WORKER_THREADS 时,代码可以很好地工作。 ;然而,一旦队列缩小到小于 NUM_WORKER_THREADS计数,由于 items.get() 之间的时间,新的迭代可以采用相同的项目。和 task_done这会导致调用 task_done 时出现死锁。

处理这个问题的正确方法是什么?

import time
import threading

from queue import Queue
NUM_WORKER_THREADS = 8

def worker():
try:
while items.qsize() > 0:
print("{} items left to process".format(items.qsize()))
item = items.get()
print("Processing {}".format(item))
itemrec = getItemRecord(item) # external call to webservice ~3 second response.
items.task_done()

except Exception as inst:
print("---------------EXCEPTION OCCURRED----------------")
print(type(inst))
print(inst.args)
print(inst)


# start counter to monitor performance
start = time.perf_counter()

items = Queue()
# get the items we need to work on for allocations
searchResults = getSearchResults() # external call to webservice

# add results of search to a collection
for itemid in searchResults:
if itemid['recordtype'] == 'inventoryitem':
items.put(itemid['id'])

for i in range(NUM_WORKER_THREADS):
try:
t = threading.Thread(target=worker)
t.daemon = True
t.start()
except Exception as inst:
print("---------------EXCEPTION OCCURRED----------------")
print(type(inst))
print(inst.args)
print(inst)

items.join()

# print end of execution performance counter
print('time:', time.perf_counter() - start)

最佳答案

当没有更多的工作项需要处理时,我会使用哨兵告诉工作人员关闭,而不是依赖于容易受到竞争条件影响的队列大小:

import time
import threading

from queue import Queue
NUM_WORKER_THREADS = 8

def worker():
for item in iter(items.get, None):
try:
print("{} items left to process".format(items.qsize()))
print("Processing {}".format(item))
except Exception as inst:
print("---------------EXCEPTION OCCURRED----------------")
print(type(inst))
print(inst.args)
print(inst)
finally:
items.task_done()
print("Got sentinel, shut down")
items.task_done()

# start counter to monitor performance
start = time.perf_counter()

items = Queue()
# get the items we need to work on for allocations
searchResults = getSearchResults() # external call to webservice

# add results of search to a collection
for itemid in searchResults:
if itemid['recordtype'] == 'inventoryitem':
items.put(itemid['id'])

for _ in range(NUM_WORKER_THREADS):
items.put(None) # Load a sentinel for each worker thread

for i in range(NUM_WORKER_THREADS):
try:
t = threading.Thread(target=worker)
t.daemon = True
t.start()
except Exception as inst:
print("---------------EXCEPTION OCCURRED----------------")
print(type(inst))
print(inst.args)
print(inst)

items.join()

# print end of execution performance counter
print('time:', time.perf_counter() - start)

另请注意,您可以使用 Python 提供的内置线程池 ( multiprocessing.dummy.Pool ) 来更优雅地执行此操作:

import time
from multiprocessing.dummy import Pool # Thread Pool

NUM_WORKER_THREADS = 8

def worker(item):
try:
print("Processing {}".format(item))
itemrec = getItemRecord(item) # external call to webservice ~3 second response.
except Exception as inst:
print("---------------EXCEPTION OCCURRED----------------")
print(type(inst))
print(inst.args)
print(inst)

# start counter to monitor performance
start = time.perf_counter()

# get the items we need to work on for allocations
searchResults = getSearchResults() # external call to webservice
pool = Pool(NUM_WORKER_THREADS)
pool.map(worker, [item['id'] for item in searchResults
if item['recordtype'] == 'inventoryitem'])
pool.close()
pool.join()


# print end of execution performance counter
print('time:', time.perf_counter() - start)

关于python - 由于任务已被新线程拉取而调用 task_done() 时发生死锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29477084/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com