gpt4 book ai didi

python - 如何在 pyspark 中并行下载大量 URL 列表?

转载 作者:行者123 更新时间:2023-12-01 07:12:19 28 4
gpt4 key购买 nike

我有一个包含 10000 个要获取的 url 的 RDD。

list = ['http://SDFKHSKHGKLHSKLJHGSDFKSJH.com',
'http://google.com',
'http://twitter.com']
urls = sc.parallelize(list)

我需要检查哪些网址已损坏,并且最好将结果获取到 Python 中的相应 RDD。我试过这个:

import asyncio
import concurrent.futures
import requests

async def get(url):

with concurrent.futures.ThreadPoolExecutor() as executor:

loop = asyncio.get_event_loop()
futures = [
loop.run_in_executor(
executor,
requests.get,
i
)
for i in url
]
return futures

async def get_response(futures):
response = await asyncio.gather(futures,return_exceptions=True)
return(response)

tasks = urls.map(lambda query: get(query)) # Method returns http call response as a Future[String]

results = tasks.map(lambda task: get_response(task) )
results = results.map(lambda response:'ERR' if isinstance(response, Exception) else 'OK' )
results.collect()

我得到以下输出,这显然是不正确的:

['OK', 'OK', 'OK']

我也尝试过这个:

import asyncio
import concurrent.futures
import requests

async def get():

with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:

loop = asyncio.get_event_loop()
futures = [
loop.run_in_executor(
executor,
requests.get,
i
)
for i in urls.toLocalIterator()
]
for response in await asyncio.gather(*futures,return_exceptions=True):
print('{}: {}'.format(response, 'ERR' if isinstance(response, Exception) else 'OK'))
pass


loop = asyncio.get_event_loop()
loop.run_until_complete(get())

我得到以下输出:

HTTPConnectionPool(host='SDFKHSKHGKLHSKLJHGSDFKSJH.com', port=80): Max retries exceeded with url: / (Caused by 
NewConnectionError('<urllib3.connection.HTTPConnection object at 0x12c834210>: Failed to establish a new connection: [Errno 8] nodename nor servname provided, or not known')): ERR
<Response [200]>: OK
<Response [200]>: OK

期望的输出是这样的:

http://SDFKHSKHGKLHSKLJHGSDFKSJH.com : ERR
http://google.com : OK
http://twitter.com : OK

但是第二种方法的问题是使用列表来存储 future 的对象。我相信使用 RDD 更好,因为 url 的数量可能达到数百万或数十亿,并且没有一台机器可以处理它。我也不清楚如何从响应中检索网址。

最佳答案

如果您使用concurrent.futures,则根本不需要 asyncio(它不会给您带来任何好处,因为无论如何您都是在多个线程中运行)。您可以使用 concurrent.futures.wait() 并行等待多个 future。

我无法测试您的数据,但它应该适用于如下代码:

import concurrent.futures, requests

def get_one(url):
resp = requests.get(url)
resp.raise_for_status()
return resp.text

def get_all():
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
futures = [executor.submit(get_one, url)
for url in urls.toLocalIterator()]
# the end of the "with" block will automatically wait
# for all of the executor's tasks to complete

for fut in futures:
if fut.exception() is not None:
print('{}: {}'.format(fut.exception(), 'ERR')
else:
print('{}: {}'.format(fut.result(), 'OK')

要使用 asyncio 执行相同的操作,您应该使用 aiohttp相反。

关于python - 如何在 pyspark 中并行下载大量 URL 列表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58146055/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com