gpt4 book ai didi

python - Django、 celery 、Redis、RabbitMQ : Chained Tasks for Fanout-On-Writes

转载 作者:IT王子 更新时间:2023-10-29 05:55:23 26 4
gpt4 key购买 nike

我一直在看 Rick Branson 的 PyCon 视频:Messaging at Scale at Instagram .您可能想观看视频来回答这个问题。 Rick Branson 使用 Celery、Redis 和 RabbitMQ。为了让你跟上速度,每个用户都有一个 redis 列表作为他们的主页。每个列表都包含他们关注的人发布的照片​​的媒体 ID。

例如,贾斯汀比伯拥有 150 万粉丝。当他发布一张照片时,该照片的 ID 需要插入到他的每个关注者的每个单独的 redis 列表中。这称为 Fanout-On-Write 方法。但是,这种方法存在一些可靠性问题。它可以工作,但对于像 Justin Bieber 或 Lady Gaga 这样拥有数百万粉丝的人来说,在 Web 请求中执行此操作(您有 0-500 毫秒的时间来完成请求)可能会出现问题。届时,请求将超时。

于是Rick Branson决定使用Celery,一个基于分布式消息传递的异步任务队列/作业队列。任何繁重的工作,例如将媒体 ID 插入关注者列表,都可以在 Web 请求之外异步完成。请求将完成,celery 将继续将 ID 插入所有列表。

这种方法创造了奇迹。但同样,您不想将 Justin 的所有追随者一次性交付给 Celery,因为这会占用 Celery 工作人员的时间。为什么不让多个 worker 同时处理它以便更快地完成呢?卓见!您希望将这个 block 分解成更小的 block ,并让不同的工作人员处理每批。里克布兰森做了一批 10,000 名粉丝,他使用一种叫做游标的东西来为贾斯汀比伯的所有粉丝不断插入媒体 ID,直到完成。在视频中,他在 3:56 中谈到了这一点

我想知道是否有人可以对此进行更多解释并举例说明如何做到这一点。我目前正在尝试进行相同的设置。我使用 Andy McCurdy 的 redis-py python 客户端库与我的 redis 服务器通信。对于我服务中的每个用户,我都会创建一个 redis 关注者列表。

因此 ID 为 343 的用户将在以下键处有一个列表:

followers:343

我还为每个用户创建了一个主页列表。每个用户都有自己的列表。因此 ID 为 1990 的用户将在以下键处有一个列表:

homefeed:1990

在“followers:343”redis列表中,包含了所有关注用户343的人的ID。用户343有20007个关注者。下面,我将检索列表中从索引 0 开始一直到结尾 -1 的所有 ID,只是为了向您展示它的样子。

>>> r_server.lrange("followers:343", 0, -1)
['8', '7', '5', '3', '65', '342', '42', etc...] ---> for the sake of example, assume this list has another 20,000 IDs.

您看到的是所有关注用户 343 的用户 ID 的列表。

这是我的 proj/mydjangoapp/tasks.py,其中包含我的 insert_into_homefeed 函数:

from __future__ import absolute_import
from celery import shared_task
import redis
pool = redis.ConnectionPool(host='XX.XXX.XXX.X', port=6379, db=0, password='XXXXX')

@shared_task
def insert_into_homefeed(photo_id, user_id):
# Grab the list of all follower IDs from Redis for user_id.
r_server = redis.Redis(connection_pool=pool)

followers_list = r_server.lrange("followers:%s" % (user_id), 0, -1)

# Now for each follower_id in followers_list, find their homefeed key
# in Redis and insert the photo_id into that homefeed list.

for follower_id in followers_list:
homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id)
return "Fan Out Completed for %s" % (user_id)

在此任务中,当从 Django View 调用时,它将获取所有关注用户 343 的人的 ID,然后将照片 ID 插入到他们所有的主页列表中。

这是我在 proj/mydjangoapp/views.py 中的上传 View 。我基本上调用 celery 的 delay 方法并传递必要的变量,以便请求快速结束:

# Import the Celery Task Here
from mydjangoapp.tasks import insert_into_homefeed


@csrf_exempt
def Upload(request):
if request.method == 'POST':
data = json.loads(request.body)
newPhoto = Photo.objects.create(user_id = data['user_id'], description= data['description'], photo_url = data['photo_url'])
newPhoto_ID = newPhoto.pk
insert_into_homefeed.delay(newPhoto_ID, data['user_id'])
return HttpResponse("Request Completed")

我怎样才能按 10,000 个批处理?

最佳答案

视频中描述的方法是任务“链接”。

为了让您的任务方法作为一个链启动并运行,您需要添加一个额外的参数来代表关注者列表中的索引。该任务不是处理完整的关注者列表,而是只处理固定的批量大小,从传递给它的索引参数开始。完成时,任务应创建一个新任务并传递新索引。

INSERT_INTO_HOMEFEED_BATCH = 10000

@shared_task
def insert_into_homefeed(photo_id, user_id, index=0):
# Grab the list of all follower IDs from Redis for user_id.
r_server = redis.Redis(connection_pool=pool)

range_limit = index + INSERT_INTO_HOMEFEED_BATCH - 1 # adjust for zero-index

followers_list_batch = r_server.lrange("followers:%s" % (user_id), index, range_limit)

if not followers_list_batch:
return # zero followers or no more batches

# Now for each follower_id in followers_list_batch, find their homefeed key
# in Redis and insert the photo_id into that homefeed list.
for follower_id in followers_list:
homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id)

insert_into_homefeed.delay(photo_id, user_id, range_limit + 1)

这很有效,因为 Redis lists are ordered和 lrange 命令 doesn't return an error on out-of-range inputs .

关于python - Django、 celery 、Redis、RabbitMQ : Chained Tasks for Fanout-On-Writes,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21007096/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com