gpt4 book ai didi

python - 使用ProcessPoolExecutor进行网页抓取: How to get data back to queue and results?

转载 作者:行者123 更新时间:2023-12-01 01:01:07 25 4
gpt4 key购买 nike

我编写了一个程序来抓取单个网站并抓取某些数据。我想使用ProcessingPoolExecutor来加快其执行速度。但是,我无法理解如何从单线程转换为并发。

具体来说,在创建作业时(通过ProcessPoolExecutor.submit()),我可以传递类/对象和参数而不是函数和参数吗?

如果是这样,如何将数据从这些作业返回到队列以跟踪访问的页面以及用于保存抓取内容的结构?

我一直在使用this作为起点,并回顾 Queueconcurrent.futures文档(坦率地说,后者有点超出我的理解)。我也用 Google/Youtube/SO 搜索过很多次,但没有结果。

from queue import Queue, Empty
from concurrent.futures import ProcessPoolExecutor


class Scraper:
"""
Scrapes a single url
"""

def __init__(self, url):
self.url = url # url of page to scrape
self.internal_urls = None
self.content = None
self.scrape()

def scrape(self):
"""
Method(s) to request a page, scrape links from that page
to other pages, and finally scrape actual content from the current page
"""
# assume that code in this method would yield urls linked in current page
self.internal_urls = set(scraped_urls)

# and that code in this method would scrape a bit of actual content
self.content = {'content1': content1, 'content2': content2, 'etc': etc}


class CrawlManager:
"""
Manages a multiprocess crawl and scrape of a single site
"""

def __init__(self, seed_url):
self.seed_url = seed_url
self.pool = ProcessPoolExecutor(max_workers=10)
self.processed_urls = set([])
self.queued_urls = Queue()
self.queued_urls.put(self.seed_url)
self.data = {}

def crawl(self):
while True:
try:
# get a url from the queue
target_url = self.queued_urls.get(timeout=60)

# check that the url hasn't already been processed
if target_url not in self.processed_urls:
# add url to the processed list
self.processed_urls.add(target_url)
print(f'Processing url {target_url}')

# passing an object to the
# ProcessPoolExecutor... can this be done?
job = self.pool.submit(Scraper, target_url)

"""
How do I 1) return the data from each
Scraper instance into self.data?
and 2) put scraped links to self.queued_urls?
"""

except Empty:
print("All done.")
except Exception as e:
print(e)


if __name__ == '__main__':
crawler = CrawlManager('www.mywebsite.com')
crawler.crawl()

最佳答案

对于任何浏览此页面的人,我都能自己解决这个问题。

根据 @brad-solomon 的建议,我从 ProcessPoolExecutor 切换到 ThreadPoolExecutor 来管理此脚本的并发方面(有关更多详细信息,请参阅他的评论)。

W.r.t。原来的问题,关键是利用 ThreadPoolExecutor 的 add_done_callback 方法结合对 Scraper.scrape 的修改和新方法CrawlManager.proc_scraper_results如下:

from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor


class Scraper:
"""
Scrapes a single url
"""

def __init__(self, url):
self.url = url # url of page to scrape
self.internal_urls = None
self.content = None
self.scrape()

def scrape(self):
"""
Method(s) to request a page, scrape links from that page
to other pages, and finally scrape actual content from the current page
"""
# assume that code in this method would yield urls linked in current page
self.internal_urls = set(scraped_urls)

# and that code in this method would scrape a bit of actual content
self.content = {'content1': content1, 'content2': content2, 'etc': etc}

# these three items will be passed to the callback
# function with in a future object
return self.internal_urls, self.url, self.content


class CrawlManager:
"""
Manages a multiprocess crawl and scrape of a single website
"""

def __init__(self, seed_url):
self.seed_url = seed_url
self.pool = ThreadPoolExecutor(max_workers=10)
self.processed_urls = set([])
self.queued_urls = Queue()
self.queued_urls.put(self.seed_url)
self.data = {}


def proc_scraper_results(self, future):
# get the items of interest from the future object
internal_urls, url, content = future._result[0], future._result[1], future._result[2]

# assign scraped data/content
self.data[url] = content

# also add scraped links to queue if they
# aren't already queued or already processed
for link_url in internal_urls:
if link_url not in self.to_crawl.queue and link_url not in self.processed_urls:
self.to_crawl.put(link_url)


def crawl(self):
while True:
try:
# get a url from the queue
target_url = self.queued_urls.get(timeout=60)

# check that the url hasn't already been processed
if target_url not in self.processed_urls:
# add url to the processed list
self.processed_urls.add(target_url)
print(f'Processing url {target_url}')

# add a job to the ThreadPoolExecutor (note, unlike original question, we pass a method, not an object)
job = self.pool.submit(Scraper(target_url).scrape)

# to add_done_callback we send another function, this one from CrawlManager
# when this function is itself called, it will be pass a `future` object
job.add_done_callback(self.proc_scraper_results)

except Empty:
print("All done.")
except Exception as e:
print(e)


if __name__ == '__main__':
crawler = CrawlManager('www.mywebsite.com')
crawler.crawl()

这样做的结果是大大缩短了该计划的持续时间。

关于python - 使用ProcessPoolExecutor进行网页抓取: How to get data back to queue and results?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55788116/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com