- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有一个 list
的 awaitables
我想传递给 asyncio.AbstractEventLoop
但我需要限制对第三方 API 的请求。
我想避免等待将 future
传递给循环的东西,因为与此同时我阻止了我的循环等待。我有什么选择? Semaphores
和 ThreadPools
将限制并发运行的数量,但这不是我的问题。我需要将我的请求限制为 100/秒,但完成请求需要多长时间并不重要。
这是一个使用标准库的非常简洁的(非)工作示例,它演示了问题。这应该以 100/秒的速度 throttle ,但以 116.651/秒的速度 throttle 。 在 asyncio 中限制异步请求调度的最佳方法是什么?
工作代码:
import asyncio
from threading import Lock
class PTBNL:
def __init__(self):
self._req_id_seq = 0
self._futures = {}
self._results = {}
self.token_bucket = TokenBucket()
self.token_bucket.set_rate(100)
def run(self, *awaitables):
loop = asyncio.get_event_loop()
if not awaitables:
loop.run_forever()
elif len(awaitables) == 1:
return loop.run_until_complete(*awaitables)
else:
future = asyncio.gather(*awaitables)
return loop.run_until_complete(future)
def sleep(self, secs) -> True:
self.run(asyncio.sleep(secs))
return True
def get_req_id(self) -> int:
new_id = self._req_id_seq
self._req_id_seq += 1
return new_id
def start_req(self, key):
loop = asyncio.get_event_loop()
future = loop.create_future()
self._futures[key] = future
return future
def end_req(self, key, result=None):
future = self._futures.pop(key, None)
if future:
if result is None:
result = self._results.pop(key, [])
if not future.done():
future.set_result(result)
def req_data(self, req_id, obj):
# Do Some Work Here
self.req_data_end(req_id)
pass
def req_data_end(self, req_id):
print(req_id, " has ended")
self.end_req(req_id)
async def req_data_async(self, obj):
req_id = self.get_req_id()
future = self.start_req(req_id)
self.req_data(req_id, obj)
await future
return future.result()
async def req_data_batch_async(self, contracts):
futures = []
FLAG = False
for contract in contracts:
req_id = self.get_req_id()
future = self.start_req(req_id)
futures.append(future)
nap = self.token_bucket.consume(1)
if FLAG is False:
FLAG = True
start = asyncio.get_event_loop().time()
asyncio.get_event_loop().call_later(nap, self.req_data, req_id, contract)
await asyncio.gather(*futures)
elapsed = asyncio.get_event_loop().time() - start
return futures, len(contracts)/elapsed
class TokenBucket:
def __init__(self):
self.tokens = 0
self.rate = 0
self.last = asyncio.get_event_loop().time()
self.lock = Lock()
def set_rate(self, rate):
with self.lock:
self.rate = rate
self.tokens = self.rate
def consume(self, tokens):
with self.lock:
if not self.rate:
return 0
now = asyncio.get_event_loop().time()
lapse = now - self.last
self.last = now
self.tokens += lapse * self.rate
if self.tokens > self.rate:
self.tokens = self.rate
self.tokens -= tokens
if self.tokens >= 0:
return 0
else:
return -self.tokens / self.rate
if __name__ == '__main__':
asyncio.get_event_loop().set_debug(True)
app = PTBNL()
objs = [obj for obj in range(500)]
l,t = app.run(app.req_data_batch_async(objs))
print(l)
print(t)
编辑:我在此处使用信号量添加了一个 TrottleTestApp
的简单示例,但仍然无法限制执行:
import asyncio
import time
class ThrottleTestApp:
def __init__(self):
self._req_id_seq = 0
self._futures = {}
self._results = {}
self.sem = asyncio.Semaphore()
async def allow_requests(self, sem):
"""Permit 100 requests per second; call
loop.create_task(allow_requests())
at the beginning of the program to start this routine. That call returns
a task handle that can be canceled to end this routine.
asyncio.Semaphore doesn't give us a great way to get at the value other
than accessing sem._value. We do that here, but creating a wrapper that
adds a current_value method would make this cleaner"""
while True:
while sem._value < 100: sem.release()
await asyncio.sleep(1) # Or spread more evenly
# with a shorter sleep and
# increasing the value less
async def do_request(self, req_id, obj):
await self.sem.acquire()
# this is the work for the request
self.req_data(req_id, obj)
def run(self, *awaitables):
loop = asyncio.get_event_loop()
if not awaitables:
loop.run_forever()
elif len(awaitables) == 1:
return loop.run_until_complete(*awaitables)
else:
future = asyncio.gather(*awaitables)
return loop.run_until_complete(future)
def sleep(self, secs: [float]=0.02) -> True:
self.run(asyncio.sleep(secs))
return True
def get_req_id(self) -> int:
new_id = self._req_id_seq
self._req_id_seq += 1
return new_id
def start_req(self, key):
loop = asyncio.get_event_loop()
future = loop.create_future()
self._futures[key] = future
return future
def end_req(self, key, result=None):
future = self._futures.pop(key, None)
if future:
if result is None:
result = self._results.pop(key, [])
if not future.done():
future.set_result(result)
def req_data(self, req_id, obj):
# This is the method that "does" something
self.req_data_end(req_id)
pass
def req_data_end(self, req_id):
print(req_id, " has ended")
self.end_req(req_id)
async def req_data_batch_async(self, objs):
futures = []
FLAG = False
for obj in objs:
req_id = self.get_req_id()
future = self.start_req(req_id)
futures.append(future)
if FLAG is False:
FLAG = True
start = time.time()
self.do_request(req_id, obj)
await asyncio.gather(*futures)
elapsed = time.time() - start
print("Roughly %s per second" % (len(objs)/elapsed))
return futures
if __name__ == '__main__':
asyncio.get_event_loop().set_debug(True)
app = ThrottleTestApp()
objs = [obj for obj in range(10000)]
app.run(app.req_data_batch_async(objs))
最佳答案
您可以通过实现 leaky bucket algorithm 来做到这一点:
import asyncio
import contextlib
import collections
import time
from types import TracebackType
from typing import Dict, Optional, Type
try: # Python 3.7
base = contextlib.AbstractAsyncContextManager
_current_task = asyncio.current_task
except AttributeError:
base = object # type: ignore
_current_task = asyncio.Task.current_task # type: ignore
class AsyncLeakyBucket(base):
"""A leaky bucket rate limiter.
Allows up to max_rate / time_period acquisitions before blocking.
time_period is measured in seconds; the default is 60.
"""
def __init__(
self,
max_rate: float,
time_period: float = 60,
loop: Optional[asyncio.AbstractEventLoop] = None
) -> None:
self._loop = loop
self._max_level = max_rate
self._rate_per_sec = max_rate / time_period
self._level = 0.0
self._last_check = 0.0
# queue of waiting futures to signal capacity to
self._waiters: Dict[asyncio.Task, asyncio.Future] = collections.OrderedDict()
def _leak(self) -> None:
"""Drip out capacity from the bucket."""
if self._level:
# drip out enough level for the elapsed time since
# we last checked
elapsed = time.time() - self._last_check
decrement = elapsed * self._rate_per_sec
self._level = max(self._level - decrement, 0)
self._last_check = time.time()
def has_capacity(self, amount: float = 1) -> bool:
"""Check if there is enough space remaining in the bucket"""
self._leak()
requested = self._level + amount
# if there are tasks waiting for capacity, signal to the first
# there there may be some now (they won't wake up until this task
# yields with an await)
if requested < self._max_level:
for fut in self._waiters.values():
if not fut.done():
fut.set_result(True)
break
return self._level + amount <= self._max_level
async def acquire(self, amount: float = 1) -> None:
"""Acquire space in the bucket.
If the bucket is full, block until there is space.
"""
if amount > self._max_level:
raise ValueError("Can't acquire more than the bucket capacity")
loop = self._loop or asyncio.get_event_loop()
task = _current_task(loop)
assert task is not None
while not self.has_capacity(amount):
# wait for the next drip to have left the bucket
# add a future to the _waiters map to be notified
# 'early' if capacity has come up
fut = loop.create_future()
self._waiters[task] = fut
try:
await asyncio.wait_for(
asyncio.shield(fut),
1 / self._rate_per_sec * amount,
loop=loop
)
except asyncio.TimeoutError:
pass
fut.cancel()
self._waiters.pop(task, None)
self._level += amount
return None
async def __aenter__(self) -> None:
await self.acquire()
return None
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType]
) -> None:
return None
请注意,我们从桶中机会性地泄漏容量,不需要运行单独的异步任务来降低级别;相反,当测试剩余容量是否足够时,容量会泄漏。
请注意,等待容量的任务保存在有序字典中,当可能再次有空闲容量时,第一个仍在等待的任务会提前唤醒。
您可以将其用作上下文管理器;尝试在桶满时获取桶 block ,直到再次释放足够的容量:
bucket = AsyncLeakyBucket(100)
# ...
async with bucket:
# only reached once the bucket is no longer full
或者你可以直接调用acquire()
:
await bucket.acquire() # blocks until there is space in the bucket
或者您可以先简单地测试是否有空间:
if bucket.has_capacity():
# reject a request due to rate limiting
请注意,您可以通过增加或减少“滴入”桶中的量来将某些请求计为“较重”或“较轻”:
await bucket.acquire(10)
if bucket.has_capacity(0.5):
尽管如此,请务必小心;当混合大滴滴和小滴滴时,当达到或接近最大速率时,小滴滴往往先于大滴滴运行,因为在有空间容纳更大滴滴之前,更可能有足够的可用容量容纳更小的滴滴。
演示:
>>> import asyncio, time
>>> bucket = AsyncLeakyBucket(5, 10)
>>> async def task(id):
... await asyncio.sleep(id * 0.01)
... async with bucket:
... print(f'{id:>2d}: Drip! {time.time() - ref:>5.2f}')
...
>>> ref = time.time()
>>> tasks = [task(i) for i in range(15)]
>>> result = asyncio.run(asyncio.wait(tasks))
0: Drip! 0.00
1: Drip! 0.02
2: Drip! 0.02
3: Drip! 0.03
4: Drip! 0.04
5: Drip! 2.05
6: Drip! 4.06
7: Drip! 6.06
8: Drip! 8.06
9: Drip! 10.07
10: Drip! 12.07
11: Drip! 14.08
12: Drip! 16.08
13: Drip! 18.08
14: Drip! 20.09
开始时,桶会被快速填满,从而使其余任务分布得更均匀;每 2 秒就会释放足够的容量来处理另一项任务。
最大突发大小等于最大速率值,在上面的演示中设置为 5。如果您不想允许突发,请将最大速率设置为 1,并将时间段设置为之间的最小时间点滴:
>>> bucket = AsyncLeakyBucket(1, 1.5) # no bursts, drip every 1.5 seconds
>>> async def task():
... async with bucket:
... print(f'Drip! {time.time() - ref:>5.2f}')
...
>>> ref = time.time()
>>> tasks = [task() for _ in range(5)]
>>> result = asyncio.run(asyncio.wait(tasks))
Drip! 0.00
Drip! 1.50
Drip! 3.01
Drip! 4.51
Drip! 6.02
我已经开始将其打包为一个 Python 项目:https://github.com/mjpieters/aiolimiter
关于python - 在 Python Asyncio 中限制异步函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45440900/
我正在处理一组标记为 160 个组的 173k 点。我想通过合并最接近的(到 9 或 10 个组)来减少组/集群的数量。我搜索过 sklearn 或类似的库,但没有成功。 我猜它只是通过 knn 聚类
我有一个扁平数字列表,这些数字逻辑上以 3 为一组,其中每个三元组是 (number, __ignored, flag[0 or 1]),例如: [7,56,1, 8,0,0, 2,0,0, 6,1,
我正在使用 pipenv 来管理我的包。我想编写一个 python 脚本来调用另一个使用不同虚拟环境(VE)的 python 脚本。 如何运行使用 VE1 的 python 脚本 1 并调用另一个 p
假设我有一个文件 script.py 位于 path = "foo/bar/script.py"。我正在寻找一种在 Python 中通过函数 execute_script() 从我的主要 Python
这听起来像是谜语或笑话,但实际上我还没有找到这个问题的答案。 问题到底是什么? 我想运行 2 个脚本。在第一个脚本中,我调用另一个脚本,但我希望它们继续并行,而不是在两个单独的线程中。主要是我不希望第
我有一个带有 python 2.5.5 的软件。我想发送一个命令,该命令将在 python 2.7.5 中启动一个脚本,然后继续执行该脚本。 我试过用 #!python2.7.5 和http://re
我在 python 命令行(使用 python 2.7)中,并尝试运行 Python 脚本。我的操作系统是 Windows 7。我已将我的目录设置为包含我所有脚本的文件夹,使用: os.chdir("
剧透:部分解决(见最后)。 以下是使用 Python 嵌入的代码示例: #include int main(int argc, char** argv) { Py_SetPythonHome
假设我有以下列表,对应于及时的股票价格: prices = [1, 3, 7, 10, 9, 8, 5, 3, 6, 8, 12, 9, 6, 10, 13, 8, 4, 11] 我想确定以下总体上最
所以我试图在选择某个单选按钮时更改此框架的背景。 我的框架位于一个类中,并且单选按钮的功能位于该类之外。 (这样我就可以在所有其他框架上调用它们。) 问题是每当我选择单选按钮时都会出现以下错误: co
我正在尝试将字符串与 python 中的正则表达式进行比较,如下所示, #!/usr/bin/env python3 import re str1 = "Expecting property name
考虑以下原型(prototype) Boost.Python 模块,该模块从单独的 C++ 头文件中引入类“D”。 /* file: a/b.cpp */ BOOST_PYTHON_MODULE(c)
如何编写一个程序来“识别函数调用的行号?” python 检查模块提供了定位行号的选项,但是, def di(): return inspect.currentframe().f_back.f_l
我已经使用 macports 安装了 Python 2.7,并且由于我的 $PATH 变量,这就是我输入 $ python 时得到的变量。然而,virtualenv 默认使用 Python 2.6,除
我只想问如何加快 python 上的 re.search 速度。 我有一个很长的字符串行,长度为 176861(即带有一些符号的字母数字字符),我使用此函数测试了该行以进行研究: def getExe
list1= [u'%app%%General%%Council%', u'%people%', u'%people%%Regional%%Council%%Mandate%', u'%ppp%%Ge
这个问题在这里已经有了答案: Is it Pythonic to use list comprehensions for just side effects? (7 个答案) 关闭 4 个月前。 告
我想用 Python 将两个列表组合成一个列表,方法如下: a = [1,1,1,2,2,2,3,3,3,3] b= ["Sun", "is", "bright", "June","and" ,"Ju
我正在运行带有最新 Boost 发行版 (1.55.0) 的 Mac OS X 10.8.4 (Darwin 12.4.0)。我正在按照说明 here构建包含在我的发行版中的教程 Boost-Pyth
学习 Python,我正在尝试制作一个没有任何第 3 方库的网络抓取工具,这样过程对我来说并没有简化,而且我知道我在做什么。我浏览了一些在线资源,但所有这些都让我对某些事情感到困惑。 html 看起来
我是一名优秀的程序员,十分优秀!