gpt4 book ai didi

Python - 多个同时线程池

转载 作者:太空狗 更新时间:2023-10-30 00:57:38 28 4
gpt4 key购买 nike

我正在用 python 编写一个网络抓取工具,使用 httplib2 和 lxml(是的 - 我知道我可以使用 scrapy。让我们过去吧......)这个抓取工具有大约 15000 个页面可以解析成大约 400,000 个项目。我已经得到了解析项目的代码以立即(几乎)运行,但是从服务器下载页面的部分仍然非常慢。我想通过并发来克服这个问题。但是,我不能依赖每次都需要解析的每个页面。我尝试过使用单个 ThreadPool(如 multiprocessing.pool,但使用线程完成 - 这应该没问题,因为这是一个 I/O 绑定(bind)进程),但我想不出一种优雅(或有效)的方式来获取当最后一个索引项的日期大于我们正在处理的项目时,所有线程都将停止。现在,我正在研究一种使用 ThreadPool 的两个实例的方法 - 一个用于下载每个页面,另一个用于解析页面。一个简化的代码示例是:

#! /usr/bin/env python2

import httplib2
from Queue import PriorityQueue
from multiprocessing.pool import ThreadPool
from lxml.html import fromstring

pages = [x for x in range(1000)]
page_queue = PriorityQueue(1000)

url = "http://www.google.com"

def get_page(page):
#Grabs google.com
h = httplib2.Http(".cache")
resp, content = h.request(url, "GET")
tree = fromstring(str(content), base_url=url)
page_queue.put((page, tree))
print page_queue.qsize()

def parse_page():
page_num, page = page_queue.get()
print "Parsing page #" + str(page_num)
#do more stuff with the page here
page_queue.task_done()

if __name__ == "__main__":
collect_pool = ThreadPool()
collect_pool.map_async(get_page, pages)
collect_pool.close()

parse_pool = ThreadPool()
parse_pool.apply_async(parse_page)
parse_pool.close()


parse_pool.join()
collect_pool.join()
page_queue.join()

然而,运行这段代码并没有达到我的预期——即触发两个线程池:一个填充队列,另一个从队列中拉出进行解析。它开始收集池并运行它,然后开始 parse_pool 并运行它(我假设,我没有让代码运行足够长的时间来到达 parse_pool - 关键是 collect_pool 似乎正在运行).我很确定我搞砸了对 join() 的调用顺序,但我终究无法弄清楚它们应该采用的顺序。我的问题基本上是这样的:我在这里吠叫正确的树吗?如果是这样,我到底做错了什么?如果我不是 - 你的建议是什么

最佳答案

首先,您的设计在高层次上似乎是正确的。 httlib2 模块的同步特性证明了使用线程池收集页面是合理的。 (对于异步库,一个线程就足够了;请注意,即使使用 httplib2 和池,由于 GIL,任何时候也最多有一个收集器线程在运行。)解析池由用 C/C/编写的 lxml 模块证明是合理的C++(并假设 Global Interpreter Lock 在页面解析期间被释放 - 这将在 lxml 文档或代码中检查!)。如果后一种情况不成立,那么拥有专用解析池将不会带来性能提升,因为只有一个线程能够获取 GIL。在这种情况下,最好使用进程池。

我不熟悉 ThreadPool 实现,但我认为它类似于多处理模块中的 Pool 类。在此基础上,问题似乎是您只为 parse_pool 创建了一个工作项,并且在 parse_page 处理第一页之后,它从不尝试从那里出列更多页面。其他工作项也不会提交到该池,因此处理停止,并且在 parse_pool.close() 调用后(空)池的线程终止。

解决办法是消除page_queue。 get_page() 函数应该通过为它收集的每个页面调用 apply_async() 将一个工作项放在 parse_pool 上,而不是将它们送入 page_queue。

主线程应该等到 collect_queue 为空(即返回 collect_pool.join() 调用),然后它应该关闭 parse_pool(因为我们可以确定不会为解析器提交更多工作)。然后它应该通过调用 parse_pool.join() 等待 parse_pool 变空然后退出。

此外,您需要增加 connect_pool 中的线程数,以便并发处理更多的 http 请求。池中线程的默认数量是 CPU 的数量;目前你不能发出超过那么多的请求。您可以试验高达几千或一万的值;观察池的 CPU 消耗;它不应接近 1 个 CPU。

关于Python - 多个同时线程池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6039150/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com