gpt4 book ai didi

Python - 使用队列时多处理线程不会关闭

转载 作者:行者123 更新时间:2023-11-30 21:49:50 26 4
gpt4 key购买 nike

这适用于 Python 3.x

我从 CSV 文件中以 300 条为一组加载记录,然后生成工作线程以将它们提交到 REST API。我将 HTTP 响应保存在队列中,以便在处理整个 CSV 文件后可以获得跳过的记录数。但是,在我向工作人员添加队列后,线程似乎不再关闭。我想监视线程数有两个原因:(1) 一旦全部完成,我可以计算并显示跳过计数,(2) 我想增强我的脚本以生成不超过 20 个左右的线程,所以我不要耗尽内存。

我有两个问题:

  • 有人可以解释为什么使用 q.put() 时线程保持事件状态吗?
  • 是否有不同的方法来管理线程数并监视所有线程是否已完成?

这是我的代码(有些简化,因为我无法分享我正在调用的 API 的确切详细信息):

import requests, json, csv, time, datetime, multiprocessing

TEST_FILE = 'file.csv'

def read_test_data(path, chunksize=300):
leads = []
with open(path, 'rU') as data:
reader = csv.DictReader(data)
for index, row in enumerate(reader):
if (index % chunksize == 0 and index > 0):
yield leads
del leads[:]
leads.append(row)
yield leads

def worker(leads, q):
payload = {"action":"createOrUpdate","input":leads}
r = requests.post(url, params=params, data=json.dumps(payload), headers=headers)
q.put(r.text) # this puts the response in a queue for later analysis
return

if __name__ == "__main__":
q = multiprocessing.Queue() # this is a queue to put all HTTP responses in, so we count the skips
jobs = []
for leads in read_test_data(TEST_FILE): # This function reads a CSV file and provides 300 records at a time
p = multiprocessing.Process(target=worker, args=(leads,q,))
jobs.append(p)
p.start()
time.sleep(20) # checking if processes are closing automatically (they don't)
print(len(multiprocessing.active_children())) ## always returns the number of threads. If I remove 'q.put' from worker, it returns 0

# The intent is to wait until all workers are done, but it results in an infinite loop
# when I remove 'q.put' in the worker it works fine
#while len(multiprocessing.active_children()) > 0: #
# time.sleep(1)

skipped_count = 0
while not q.empty(): # calculate number of skipped records based on the HTTP responses in the queue
http_response = json.loads(q.get())
for i in http_response['result']:
if (i['status'] == "skipped" and i['reasons'][0]['code'] == "1004"):
skipped_count += 1
print("Number of records skipped: " + str(skipped_count))

最佳答案

这很可能是由于 multiprocessing.Queue 记录的这个怪癖造成的:

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the cancel_join_thread() method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.

基本上,您需要确保从 Queueget() 所有项目,以保证所有 put 内容的进程进入该队列将能够退出。

我认为在这种情况下,您最好使用 multiprocessing.Pool ,并将所有作业提交到 multiprocessing.Pool.map 。这大大简化了事情,并使您可以完全控制正在运行的进程数量:

def worker(leads):
payload = {"action":"createOrUpdate","input":leads}
r = requests.post(url, params=params, data=json.dumps(payload), headers=headers)
return r.text

if __name__ == "__main__":
pool = multiprocessing.Pool(multiprocessing.cpu_count() * 2) # cpu_count() * 2 processes running in the pool
responses = pool.map(worker, read_test_data(TEST_FILE))

skipped_count = 0
for raw_response in responses:
http_response = json.loads(raw_response)
for i in http_response['result']:
if (i['status'] == "skipped" and i['reasons'][0]['code'] == "1004"):
skipped_count += 1
print("Number of records skipped: " + str(skipped_count))

如果您担心将 read_test_data(TEST_FILE) 转换为列表(使用 Pool.map 需要)的内存消耗,您可以使用 Pool.imap相反。

编辑:

正如我在上面的评论中提到的,这个用例看起来像是 I/O 绑定(bind)的,这意味着您可以通过使用 multiprocessing.dummy.Pool (它使用线程池而不是进程池)。两者都尝试一下,看看哪个更快。

关于Python - 使用队列时多处理线程不会关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31170788/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com