- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我正在寻找一种方法来限制何时调用函数,但仅限于输入参数不同时,即:
@app.task(rate_limit="60/s")
def api_call(user):
do_the_api_call()
for i in range(0,100):
api_call("antoine")
api_call("oscar")
所以我希望 api_call("antoine")
每秒调用 60 次 和 api_call("oscar")
每秒 60 次。
关于我该怎么做的任何帮助?
--编辑 27/04/2015我曾尝试在任务中使用 rate_limit 调用子任务,但它也不起作用:rate_limit 始终应用于所有实例化的子任务或任务(这是合乎逻辑的)。
@app.task(rate_limit="60/s")
def sub_api_call(user):
do_the_api_call()
@app.task
def api_call(user):
sub_api_call(user)
for i in range(0,100):
api_call("antoine")
api_call("oscar")
最好!
最佳答案
请参阅评论部分以获取指向更好方法的链接,该方法包含此处的大部分内容,但修复了此处版本存在的乒乓问题。这里的版本天真地重试任务。也就是说,它只是稍后再试一次,有一些抖动。如果您有 1,000 个任务都在排队,这会造成困惑,因为它们都在争夺下一个可用位置。他们都只是在 task worker 中进进出出,在最终获得运行机会之前尝试了数百次。
我没有采用这种幼稚的方法,而是尝试了指数退避,每次任务受到限制时,它都会退避比之前更长的时间。这个概念可行,但它要求您存储每个任务的重试次数,这很烦人并且必须集中,而且它也不是最佳选择,因为在等待预定任务时可能会有很长时间的无事件延迟要运行的任务。 (想象一下第 50 次被节流的任务必须等待一个小时,而节流计时器在重新安排后的几秒后到期。在这种情况下,工作人员在等待时会空闲一个小时该任务要运行。)
尝试此操作的更好方法是使用调度程序,而不是单纯的重试或指数退避。评论部分中链接的更新版本维护了一个基本的调度程序,它知道何时 重试任务。它跟踪任务被限制的顺序,并知道下一个任务运行窗口何时出现。因此,想象一下 1 分钟的任务限制,时间轴如下:
00:00:00 - Task 1 is attempted and begins running
00:00:01 - Task 2 is attempted. Oh no! It gets throttled. The current
throttle expires at 00:01:00, so it is rescheduled then.
00:00:02 - Task 3 is attempted. Oh no! It gets throttled. The current
throttle expires at 00:01:00, but something is already
scheduled then, so it is rescheduled for 00:02:00.
00:01:00 - Task 2 attempts to run again. All clear! It runs.
00:02:00 - Task 3 attempts to run again. All clear! It runs.
换句话说,根据积压的长度,它将在当前限制到期后重新安排任务,并且所有其他重新安排的、受限制的任务都有机会运行。 (这花了数周时间才弄清楚。)
我今天在这方面花了一些时间,想出了一个不错的解决方案。所有其他解决方案都存在以下问题之一:
基本上,您可以像这样包装您的任务:
@app.task(bind=True, max_retries=10)
@throttle_task("2/s", key="domain", jitter=(2, 15))
def scrape_domain(self, domain):
do_stuff()
结果是您将任务限制为每个域参数每秒运行 2 次,随机重试抖动在 2-15 秒之间。 key
参数是可选的,但对应于您任务中的一个参数。如果没有给出关键参数,它只会将任务限制在给定的速率。如果提供,则 throttle 将应用于(任务,键)二元组。
另一种看待这个问题的方式是不使用装饰器。这提供了更多的灵 active ,但需要您自己进行重试。除了上述,您还可以:
@app.task(bind=True, max_retries=10)
def scrape_domain(self, domain):
proceed = is_rate_okay(self, "2/s", key=domain)
if proceed:
do_stuff()
else:
self.request.retries = task.request.retries - 1 # Don't count this as against max_retries.
return task.retry(countdown=random.uniform(2, 15))
我认为这与第一个示例相同。更长一点,更多分支,但更清楚地展示了它是如何工作的。我希望自己始终使用装饰器。
这一切都是通过在 redis 中保存一个计数来实现的。实现非常简单。您在 redis 中为任务创建一个键(和键参数,如果给定),并根据提供的时间表使 redis 键过期。如果用户设置 10/m 的速率,您将创建一个 60 秒的 Redis key ,并且每次尝试具有正确名称的任务时都会增加它。如果您的增量器变得太高,请重试该任务。否则,运行它。
def parse_rate(rate: str) -> Tuple[int, int]:
"""
Given the request rate string, return a two tuple of:
<allowed number of requests>, <period of time in seconds>
(Stolen from Django Rest Framework.)
"""
num, period = rate.split("/")
num_requests = int(num)
if len(period) > 1:
# It takes the form of a 5d, or 10s, or whatever
duration_multiplier = int(period[0:-1])
duration_unit = period[-1]
else:
duration_multiplier = 1
duration_unit = period[-1]
duration_base = {"s": 1, "m": 60, "h": 3600, "d": 86400}[duration_unit]
duration = duration_base * duration_multiplier
return num_requests, duration
def throttle_task(
rate: str,
jitter: Tuple[float, float] = (1, 10),
key: Any = None,
) -> Callable:
"""A decorator for throttling tasks to a given rate.
:param rate: The maximum rate that you want your task to run. Takes the
form of '1/m', or '10/2h' or similar.
:param jitter: A tuple of the range of backoff times you want for throttled
tasks. If the task is throttled, it will wait a random amount of time
between these values before being tried again.
:param key: An argument name whose value should be used as part of the
throttle key in redis. This allows you to create per-argument throttles by
simply passing the name of the argument you wish to key on.
:return: The decorated function
"""
def decorator_func(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
# Inspect the decorated function's parameters to get the task
# itself and the value of the parameter referenced by key.
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
task = bound_args.arguments["self"]
key_value = None
if key:
try:
key_value = bound_args.arguments[key]
except KeyError:
raise KeyError(
f"Unknown parameter '{key}' in throttle_task "
f"decorator of function {task.name}. "
f"`key` parameter must match a parameter "
f"name from function signature: '{sig}'"
)
proceed = is_rate_okay(task, rate, key=key_value)
if not proceed:
logger.info(
"Throttling task %s (%s) via decorator.",
task.name,
task.request.id,
)
# Decrement the number of times the task has retried. If you
# fail to do this, it gets auto-incremented, and you'll expend
# retries during the backoff.
task.request.retries = task.request.retries - 1
return task.retry(countdown=random.uniform(*jitter))
else:
# All set. Run the task.
return func(*args, **kwargs)
return wrapper
return decorator_func
def is_rate_okay(task: Task, rate: str = "1/s", key=None) -> bool:
"""Keep a global throttle for tasks
Can be used via the `throttle_task` decorator above.
This implements the timestamp-based algorithm detailed here:
https://www.figma.com/blog/an-alternative-approach-to-rate-limiting/
Basically, you keep track of the number of requests and use the key
expiration as a reset of the counter.
So you have a rate of 5/m, and your first task comes in. You create a key:
celery_throttle:task_name = 1
celery_throttle:task_name.expires = 60
Another task comes in a few seconds later:
celery_throttle:task_name = 2
Do not update the ttl, it now has 58s remaining
And so forth, until:
celery_throttle:task_name = 6
(10s remaining)
We're over the threshold. Re-queue the task for later. 10s later:
Key expires b/c no more ttl.
Another task comes in:
celery_throttle:task_name = 1
celery_throttle:task_name.expires = 60
And so forth.
:param task: The task that is being checked
:param rate: How many times the task can be run during the time period.
Something like, 1/s, 2/h or similar.
:param key: If given, add this to the key placed in Redis for the item.
Typically, this will correspond to the value of an argument passed to the
throttled task.
:return: Whether the task should be throttled or not.
"""
key = f"celery_throttle:{task.name}{':' + str(key) if key else ''}"
r = make_redis_interface("CACHE")
num_tasks, duration = parse_rate(rate)
# Check the count in redis
count = r.get(key)
if count is None:
# No key. Set the value to 1 and set the ttl of the key.
r.set(key, 1)
r.expire(key, duration)
return True
else:
# Key found. Check it.
if int(count) <= num_tasks:
# We're OK, run it.
r.incr(key, 1)
return True
else:
return False
关于python - Celery:对具有相同参数的任务进行速率限制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29854102/
我正在处理一组标记为 160 个组的 173k 点。我想通过合并最接近的(到 9 或 10 个组)来减少组/集群的数量。我搜索过 sklearn 或类似的库,但没有成功。 我猜它只是通过 knn 聚类
我有一个扁平数字列表,这些数字逻辑上以 3 为一组,其中每个三元组是 (number, __ignored, flag[0 or 1]),例如: [7,56,1, 8,0,0, 2,0,0, 6,1,
我正在使用 pipenv 来管理我的包。我想编写一个 python 脚本来调用另一个使用不同虚拟环境(VE)的 python 脚本。 如何运行使用 VE1 的 python 脚本 1 并调用另一个 p
假设我有一个文件 script.py 位于 path = "foo/bar/script.py"。我正在寻找一种在 Python 中通过函数 execute_script() 从我的主要 Python
这听起来像是谜语或笑话,但实际上我还没有找到这个问题的答案。 问题到底是什么? 我想运行 2 个脚本。在第一个脚本中,我调用另一个脚本,但我希望它们继续并行,而不是在两个单独的线程中。主要是我不希望第
我有一个带有 python 2.5.5 的软件。我想发送一个命令,该命令将在 python 2.7.5 中启动一个脚本,然后继续执行该脚本。 我试过用 #!python2.7.5 和http://re
我在 python 命令行(使用 python 2.7)中,并尝试运行 Python 脚本。我的操作系统是 Windows 7。我已将我的目录设置为包含我所有脚本的文件夹,使用: os.chdir("
剧透:部分解决(见最后)。 以下是使用 Python 嵌入的代码示例: #include int main(int argc, char** argv) { Py_SetPythonHome
假设我有以下列表,对应于及时的股票价格: prices = [1, 3, 7, 10, 9, 8, 5, 3, 6, 8, 12, 9, 6, 10, 13, 8, 4, 11] 我想确定以下总体上最
所以我试图在选择某个单选按钮时更改此框架的背景。 我的框架位于一个类中,并且单选按钮的功能位于该类之外。 (这样我就可以在所有其他框架上调用它们。) 问题是每当我选择单选按钮时都会出现以下错误: co
我正在尝试将字符串与 python 中的正则表达式进行比较,如下所示, #!/usr/bin/env python3 import re str1 = "Expecting property name
考虑以下原型(prototype) Boost.Python 模块,该模块从单独的 C++ 头文件中引入类“D”。 /* file: a/b.cpp */ BOOST_PYTHON_MODULE(c)
如何编写一个程序来“识别函数调用的行号?” python 检查模块提供了定位行号的选项,但是, def di(): return inspect.currentframe().f_back.f_l
我已经使用 macports 安装了 Python 2.7,并且由于我的 $PATH 变量,这就是我输入 $ python 时得到的变量。然而,virtualenv 默认使用 Python 2.6,除
我只想问如何加快 python 上的 re.search 速度。 我有一个很长的字符串行,长度为 176861(即带有一些符号的字母数字字符),我使用此函数测试了该行以进行研究: def getExe
list1= [u'%app%%General%%Council%', u'%people%', u'%people%%Regional%%Council%%Mandate%', u'%ppp%%Ge
这个问题在这里已经有了答案: Is it Pythonic to use list comprehensions for just side effects? (7 个答案) 关闭 4 个月前。 告
我想用 Python 将两个列表组合成一个列表,方法如下: a = [1,1,1,2,2,2,3,3,3,3] b= ["Sun", "is", "bright", "June","and" ,"Ju
我正在运行带有最新 Boost 发行版 (1.55.0) 的 Mac OS X 10.8.4 (Darwin 12.4.0)。我正在按照说明 here构建包含在我的发行版中的教程 Boost-Pyth
学习 Python,我正在尝试制作一个没有任何第 3 方库的网络抓取工具,这样过程对我来说并没有简化,而且我知道我在做什么。我浏览了一些在线资源,但所有这些都让我对某些事情感到困惑。 html 看起来
我是一名优秀的程序员,十分优秀!