gpt4 book ai didi

python - 当有很多线程时,队列不会处理所有元素

转载 作者:行者123 更新时间:2023-11-28 18:28:45 25 4
gpt4 key购买 nike

我注意到,当我有很多线程从队列中拉取元素时,处理的元素数量少于我放入队列的数量。这是零星的,但似乎在我运行以下代码时大约有一半的时间发生。

#!/bin/env python

from threading import Thread
import httplib, sys
from Queue import Queue
import time
import random

concurrent = 500
num_jobs = 500

results = {}

def doWork():
while True:
result = None
try:
result = curl(q.get())
except Exception as e:
print "Error when trying to get from queue: {0}".format(str(e))

if results.has_key(result):
results[result] += 1
else:
results[result] = 1

try:
q.task_done()
except:
print "Called task_done when all tasks were done"

def curl(ourl):
result = 'all good'
try:
time.sleep(random.random() * 2)
except Exception as e:
result = "error: %s" % str(e)
except:
result = str(sys.exc_info()[0])
finally:
return result or "None"

print "\nRunning {0} jobs on {1} threads...".format(num_jobs, concurrent)

q = Queue()

for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()

for x in range(num_jobs):
q.put("something")

try:
q.join()
except KeyboardInterrupt:
sys.exit(1)

total_responses = 0
for result in results:
num_responses = results[result]
print "{0}: {1} time(s)".format(result, num_responses)
total_responses += num_responses

print "Number of elements processed: {0}".format(total_responses)

最佳答案

Tim Peters 在评论中一语中的。问题是结果的跟踪是线程化的,不受任何类型的互斥锁的保护。这允许这样的事情发生:

thread A gets result: "all good"
thread A checks results[result]
thread A sees no such key
thread A suspends # <-- before counting its result
thread B gets result: "all good"
thread B checks results[result]
thread B sees no such key
thread B sets results['all good'] = 1
thread C ...
thread C sets results['all good'] = 2
thread D ...
thread A resumes # <-- and remembers it needs to count its result still
thread A sets results['all good'] = 1 # resetting previous work!

一个更典型的工作流可能有一个主线程正在监听的结果队列。

workq = queue.Queue()
resultsq = queue.Queue()

make_work(into=workq)
do_work(from=workq, respond_on=resultsq)
# do_work would do respond_on.put_nowait(result) instead of
# return result

results = {}

while True:
try:
result = resultsq.get()
except queue.Empty:
break # maybe? You'd probably want to retry a few times
results.setdefault(result, 0) += 1

关于python - 当有很多线程时,队列不会处理所有元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39152680/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com