- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在编写一个网络爬虫,它为许多不同的域运行并行提取。我想限制每秒向每个单独的域发出的请求数,但我不关心打开的连接总数或每秒的总请求数跨越所有领域。 我想最大限度地提高打开的连接数和每秒请求数,同时限制对单个域的每秒请求数。
我可以找到所有当前存在的示例 (1) 限制打开的连接数或 (2) 限制获取循环中每秒发出的请求总数。示例包括:
它们都没有按照我的要求执行,即在每个域的基础上限制每秒请求数。第一个问题仅回答如何限制整体每秒请求数。第二个甚至没有实际问题的答案(OP 询问每秒的请求数,答案都在谈论限制连接数)。
这是我尝试的代码,使用我为同步版本制作的简单速率限制器,当 DomainTimer 代码在异步事件循环中运行时它不起作用:
from collections import defaultdict
from datetime import datetime, timedelta
import asyncio
import async_timeout
import aiohttp
from urllib.parse import urlparse
from queue import Queue, Empty
from HTMLProcessing import processHTML
import URLFilters
SEED_URLS = ['http://www.bbc.co.uk', 'http://www.news.google.com']
url_queue = Queue()
for u in SEED_URLS:
url_queue.put(u)
# number of pages to download per run of crawlConcurrent()
BATCH_SIZE = 100
DELAY = timedelta(seconds = 1.0) # delay between requests from single domain, in seconds
HTTP_HEADERS = {'Referer': 'http://www.google.com',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:59.0) Gecko/20100101 Firefox/59.0'}
class DomainTimer():
def __init__(self):
self.timer = None
def resetTimer(self):
self.timer = datetime.now()
def delayExceeded(self, delay):
if not self.timer: #We haven't fetched this before
return True
if (datetime.now() - self.timer) >= delay:
return True
else:
return False
crawl_history = defaultdict(dict) # given a URL, when is last time crawled?
domain_timers = defaultdict(DomainTimer)
async def fetch(session, url):
domain = urlparse(url).netloc
print('here fetching ' + url + "\n")
dt = domain_timers[domain]
if dt.delayExceeded(DELAY) or not dt:
with async_timeout.timeout(10):
try:
dt.resetTimer() # reset domain timer
async with session.get(url, headers=HTTP_HEADERS) as response:
if response.status == 200:
crawl_history[url] = datetime.now()
html = await response.text()
return {'url': url, 'html': html}
else:
# log HTTP response, put into crawl_history so
# we don't attempt to fetch again
print(url + " failed with response: " + str(response.status) + "\n")
return {'url': url, 'http_status': response.status}
except aiohttp.ClientConnectionError as e:
print("Connection failed " + str(e))
except aiohttp.ClientPayloadError as e:
print("Recieved bad data from server @ " + url + "\n")
else: # Delay hasn't passed yet: skip for now & put @ end of q
url_queue.put(url);
return None
async def fetch_all(urls):
"""Launch requests for all web pages."""
tasks = []
async with aiohttp.ClientSession() as session:
for url in urls:
task = asyncio.ensure_future(fetch(session, url))
tasks.append(task) # create list of tasks
return await asyncio.gather(*tasks) # gather task responses
def batch_crawl():
"""Launch requests for all web pages."""
start_time = datetime.now()
# Here we build the list of URLs to crawl for this batch
urls = []
for i in range(BATCH_SIZE):
try:
next_url = url_queue.get_nowait() # get next URL from queue
urls.append(next_url)
except Empty:
print("Processed all items in URL queue.\n")
break;
loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
pages = loop.run_until_complete(fetch_all(urls))
crawl_time = (datetime.now() - start_time).seconds
print("Crawl completed. Fetched " + str(len(pages)) + " pages in " + str(crawl_time) + " seconds.\n")
return pages
def parse_html(pages):
""" Parse the HTML for each page downloaded in this batch"""
start_time = datetime.now()
results = {}
for p in pages:
if not p or not p['html']:
print("Received empty page")
continue
else:
url, html = p['url'], p['html']
results[url] = processHTML(html)
processing_time = (datetime.now() - start_time).seconds
print("HTML processing finished. Processed " + str(len(results)) + " pages in " + str(processing_time) + " seconds.\n")
return results
def extract_new_links(results):
"""Extract links from """
# later we could track where links were from here, anchor text, etc,
# and weight queue priority based on that
links = []
for k in results.keys():
new_urls = [l['href'] for l in results[k]['links']]
for u in new_urls:
if u not in crawl_history.keys():
links.append(u)
return links
def filterURLs(urls):
urls = URLFilters.filterDuplicates(urls)
urls = URLFilters.filterBlacklistedDomains(urls)
return urls
def run_batch():
pages = batch_crawl()
results = parse_html(pages)
links = extract_new_links(results)
for l in filterURLs(links):
url_queue.put(l)
return results
没有错误或异常抛出,速率限制代码在同步获取中工作正常,但 DomainTimer 在异步循环中运行时没有明显效果。不支持每个域每秒一个请求的延迟...
我如何修改这个同步速率限制代码以在异步事件循环中工作?谢谢!
最佳答案
很难调试你的代码,因为它包含许多不相关的东西,在一个新的简单示例上更容易展示想法。
主要思想:
__aenter__
、__aexit__
编写类似Semaphore
的类接受 url(域)Lock
来防止对同一域的多个请求代码:
import asyncio
import aiohttp
from urllib.parse import urlparse
from collections import defaultdict
class Limiter:
# domain -> req/sec:
_limits = {
'httpbin.org': 4,
'eu.httpbin.org': 1,
}
# domain -> it's lock:
_locks = defaultdict(lambda: asyncio.Lock())
# domain -> it's last request time
_times = defaultdict(lambda: 0)
def __init__(self, url):
self._host = urlparse(url).hostname
async def __aenter__(self):
await self._lock
to_wait = self._to_wait_before_request()
print(f'Wait {to_wait} sec before next request to {self._host}')
await asyncio.sleep(to_wait)
async def __aexit__(self, *args):
print(f'Request to {self._host} just finished')
self._update_request_time()
self._lock.release()
@property
def _lock(self):
"""Lock that prevents multiple requests to same host."""
return self._locks[self._host]
def _to_wait_before_request(self):
"""What time we need to wait before request to host."""
request_time = self._times[self._host]
request_delay = 1 / self._limits[self._host]
now = asyncio.get_event_loop().time()
to_wait = request_time + request_delay - now
to_wait = max(0, to_wait)
return to_wait
def _update_request_time(self):
now = asyncio.get_event_loop().time()
self._times[self._host] = now
# request that uses Limiter instead of Semaphore:
async def get(url):
async with Limiter(url):
async with aiohttp.ClientSession() as session: # TODO reuse session for different requests.
async with session.get(url) as resp:
return await resp.text()
# main:
async def main():
coros = [
get('http://httpbin.org/get'),
get('http://httpbin.org/get'),
get('http://httpbin.org/get'),
get('http://httpbin.org/get'),
get('http://httpbin.org/get'),
get('http://eu.httpbin.org/get'),
get('http://eu.httpbin.org/get'),
get('http://eu.httpbin.org/get'),
get('http://eu.httpbin.org/get'),
get('http://eu.httpbin.org/get'),
]
await asyncio.gather(*coros)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
关于python - aiohttp:按域限制每秒请求数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49708101/
我想要一个类似于 django runserver 所做的重新加载。 如果我更改 python 文件,我希望应用程序重新加载。我已经安装了 aiohttp-devtools 并使用 adev runs
我在使用 RouteTableDef 时遇到问题。 有一些项目的路由结构如下: 1) 有文件route.py。 路线.py from aiohttp import web routes = web.R
我有一些代码对某些 API 进行请求序列。我想为所有人设置通用日志记录,我该如何设置? 假设我的代码是这样的 import aiohttp import asyncio async def fetch
您能否就以下方面提出建议? 在 localhost:8900 上有 aiohttp 服务器在运行 当我从 python 发出类似(使用 python2 模块请求)的请求时 requests.get("
每当我对使用 asyncio 和 aiohttp 访问的 API 执行超过 200 个请求时,我都会收到 aiohttp client_exception.ServerDisconnectedErro
在我正在开发的爬虫中。它使用 pycurl multi 发出请求。 如果我改用aiohttp,我可以期待什么样的效率提升? 怀疑让我怀疑潜在的改进,因为 python 有 GIL。大部分时间都花在等待
我在尝试使用 azure 测试聊天机器人时遇到一些问题: 我使用 github actions 在 azure web 应用程序上部署了我的机器人,一切都很顺利。但是当我尝试测试我的聊天机器人时,没有
我在尝试使用 azure 测试聊天机器人时遇到一些问题: 我使用 github actions 在 azure web 应用程序上部署了我的机器人,一切都很顺利。但是当我尝试测试我的聊天机器人时,没有
我想知道如何从 aiohttp post 方法获取当前的上传步骤。通常我会使用 get 方法在循环中拉取当前步骤,但如果主机不响应当前上传步骤,这将不起作用。那么有可能得到当前步骤吗?诸如“从 xx%
我目前正在用 aiohttp 做我的第一个“婴儿学步” (来自 requests 模块)。 我尝试稍微简化请求,这样我就不必在主模块中为每个请求使用上下文管理器。 因此我尝试了这个: async de
tl;dr:如何最大化可以并行发送的 http 请求数量? 我正在使用 aiohttp 库从多个网址获取数据。我正在测试它的性能,并且观察到该过程中的某个地方存在瓶颈,一次运行更多的网址并没有帮助。
目前我正在执行以下操作来获取当前正在运行的应用程序 async def handler(request): app = request.app 是否有其他方法来获取当前正在运行的应用程序?考虑
首先是代码: import random import asyncio from aiohttp import ClientSession import csv headers =[] def ext
我的 aiohttp 中间件获取函数作为参数,而不是已传递给路由的绑定(bind)方法。如何解释这种行为?如何避免这种情况? class AsyncHttpServer: def __init
我正在尝试在 aiohttp 处理程序中启动后台长时间任务: from aiohttp import web import time import asyncio async def one(requ
我正在测试 aiohttp 和 asyncio。我希望相同的事件循环具有套接字、http 服务器、http 客户端。 我正在使用此示例代码: @routes.get('/') async def he
#!/usr/bin/env python3.5 import asyncio import aiohttp url = "http://eniig.dk" async def main():
考虑以下代码: from aiohttp_mako import template def authorize(): def wrapper(func): @asyncio.c
我正在编写一个网络爬虫,它为许多不同的域运行并行提取。我想限制每秒向每个单独的域发出的请求数,但我不关心打开的连接总数或每秒的总请求数跨越所有领域。 我想最大限度地提高打开的连接数和每秒请求数,同时限
我需要将 sub_app 添加到 sub_app。这是我的代码 app = web.Application() subapp = web.Application() subapp.router.add
我是一名优秀的程序员,十分优秀!