gpt4 book ai didi

python - 外部 API RabbitMQ 和 Celery 速率限制

转载 作者:太空宇宙 更新时间:2023-11-04 05:24:24 24 4
gpt4 key购买 nike

我正在使用一个外部 REST API,它将我的 API 请求限制在 1 CPS。

这是以下架构:

enter image description here

版本:

  • flask
  • RabbitMQ 3.6.4
  • AMPQ 1.4.9
  • 昆布 3.0.35
  • celery 3.1.23
  • python 2.7

API 客户端向内部 API 发送 Web 请求,API 处理请求并控制发送到 RabbitMQ 的速率。这些任务可能需要 5 秒到 120 秒,并且在某些情况下任务可能会排队,并且它们以比定义的速率更高的速率发送到外部 API,从而导致大量请求失败。 (导致大约 5% 的请求失败)

可能的解决方案:

  • 增加外部 API 限制
  • 添加更多 worker
  • 跟踪失败的任务并稍后重试

虽然这些解决方案可能有效,但并不能完全解决我的速率限制器的实现问题,也不能控制我的工作人员可以处理 API 请求的实际速率。稍后我真的需要控制外部速率。

我相信,如果我可以控制 RabbitMQ 速率限制,消息可以发送给工作人员,这可能是一个更好的选择。我找到了 rabbitmq 预取选项,但不确定是否有人可以推荐其他选项来控制向消费者发送消息的速率?

enter image description here

最佳答案

您需要创建自己的速率限制器,因为 Celery 的速率限制仅适用于每个工作人员,并且“不会像您期望的那样工作”。

我个人发现它在尝试从另一个任务添加新任务时完全崩溃。

我认为速率限制的要求范围太广并且取决于应用程序本身,因此 Celery 的实现有意过于简单。

这是我使用 Celery + Django + Redis 创建的示例。基本上,它向您的 App.Task 类添加了一个方便的方法,它将跟踪您在 Redis 中的任务执行率。如果它太高,任务将在稍后重试

此示例以发送 SMTP 消息为例,但可以轻松替换为 API 调用。

算法灵感来自于Figma https://www.figma.com/blog/an-alternative-approach-to-rate-limiting/

https://gist.github.com/Vigrond/2bbea9be6413415e5479998e79a1b11a

# Rate limiting with Celery + Django + Redis
# Multiple Fixed Windows Algorithm inspired by Figma https://www.figma.com/blog/an-alternative-approach-to-rate-limiting/
# and Celery's sometimes ambiguous, vague, and one-paragraph documentation
#
# Celery's Task is subclassed and the is_rate_okay function is added


# celery.py or however your App is implemented in Django
import os
import math
import time

from celery import Celery, Task
from django_redis import get_redis_connection
from django.conf import settings
from django.utils import timezone


app = Celery('your_app')

# Get Redis connection from our Django 'default' cache setting
redis_conn = get_redis_connection("default")

# We subclass the Celery Task
class YourAppTask(Task):
def is_rate_okay(self, times=30, per=60):
"""
Checks to see if this task is hitting our defined rate limit too much.
This example sets a rate limit of 30/minute.

times (int): The "30" in "30 times per 60 seconds".
per (int): The "60" in "30 times per 60 seconds".

The Redis structure we create is a Hash of timestamp keys with counter values
{
'1560649027.515933': '2', // unlikely to have more than 1
'1560649352.462433': '1',
}

The Redis key is expired after the amount of 'per' has elapsed.
The algorithm totals the counters and checks against 'limit'.

This algorithm currently does not implement the "leniency" described
at the bottom of the figma article referenced at the top of this code.
This is left up to you and depends on application.

Returns True if under the limit, otherwise False.
"""

# Get a timestamp accurate to the microsecond
timestamp = timezone.now().timestamp()

# Set our Redis key to our task name
key = f"rate:{self.name}"

# Create a pipeline to execute redis code atomically
pipe = redis_conn.pipeline()

# Increment our current task hit in the Redis hash
pipe.hincrby(key, timestamp)

# Grab the current expiration of our task key
pipe.ttl(key)

# Grab all of our task hits in our current frame (of 60 seconds)
pipe.hvals(key)

# This returns a list of our command results. [current task hits, expiration, list of all task hits,]
result = pipe.execute()

# If our expiration is not set, set it. This is not part of the atomicity of the pipeline above.
if result[1] < 0:
redis_conn.expire(key, per)

# We must convert byte to int before adding up the counters and comparing to our limit
if sum([int(count) for count in result[2]]) <= times:
return True
else:
return False


app.Task = YourAppTask
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

...

# SMTP Example
import random
from YourApp.celery import app
from django.core.mail import EmailMessage

# We set infinite max_retries so backlogged email tasks do not disappear
@app.task(name='smtp.send-email', max_retries=None, bind=True)
def send_email(self, to_address):

if not self.is_rate_okay():
# We implement a random countdown between 30 and 60 seconds
# so tasks don't come flooding back at the same time
raise self.retry(countdown=random.randint(30, 60))

message = EmailMessage(
'Hello',
'Body goes here',
'from@yourdomain.com',
[to_address],
)
message.send()

关于python - 外部 API RabbitMQ 和 Celery 速率限制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39312700/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com