- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
我一直在看 Rick Branson 的 PyCon 视频:Messaging at Scale at Instagram .您可能想观看视频来回答这个问题。 Rick Branson 使用 Celery、Redis 和 RabbitMQ。为了让你跟上速度,每个用户都有一个 redis 列表作为他们的主页。每个列表都包含他们关注的人发布的照片的媒体 ID。
例如,贾斯汀比伯拥有 150 万粉丝。当他发布一张照片时,该照片的 ID 需要插入到他的每个关注者的每个单独的 redis 列表中。这称为 Fanout-On-Write 方法。但是,这种方法存在一些可靠性问题。它可以工作,但对于像 Justin Bieber 或 Lady Gaga 这样拥有数百万粉丝的人来说,在 Web 请求中执行此操作(您有 0-500 毫秒的时间来完成请求)可能会出现问题。届时,请求将超时。
于是Rick Branson决定使用Celery,一个基于分布式消息传递的异步任务队列/作业队列。任何繁重的工作,例如将媒体 ID 插入关注者列表,都可以在 Web 请求之外异步完成。请求将完成,celery 将继续将 ID 插入所有列表。
这种方法创造了奇迹。但同样,您不想将 Justin 的所有追随者一次性交付给 Celery,因为这会占用 Celery 工作人员的时间。为什么不让多个 worker 同时处理它以便更快地完成呢?卓见!您希望将这个 block 分解成更小的 block ,并让不同的工作人员处理每批。里克布兰森做了一批 10,000 名粉丝,他使用一种叫做游标的东西来为贾斯汀比伯的所有粉丝不断插入媒体 ID,直到完成。在视频中,他在 3:56 中谈到了这一点
我想知道是否有人可以对此进行更多解释并举例说明如何做到这一点。我目前正在尝试进行相同的设置。我使用 Andy McCurdy 的 redis-py python 客户端库与我的 redis 服务器通信。对于我服务中的每个用户,我都会创建一个 redis 关注者列表。
因此 ID 为 343 的用户将在以下键处有一个列表:
followers:343
我还为每个用户创建了一个主页列表。每个用户都有自己的列表。因此 ID 为 1990 的用户将在以下键处有一个列表:
homefeed:1990
在“followers:343”redis列表中,包含了所有关注用户343的人的ID。用户343有20007个关注者。下面,我将检索列表中从索引 0 开始一直到结尾 -1 的所有 ID,只是为了向您展示它的样子。
>>> r_server.lrange("followers:343", 0, -1)
['8', '7', '5', '3', '65', '342', '42', etc...] ---> for the sake of example, assume this list has another 20,000 IDs.
您看到的是所有关注用户 343 的用户 ID 的列表。
这是我的 proj/mydjangoapp/tasks.py,其中包含我的 insert_into_homefeed 函数:
from __future__ import absolute_import
from celery import shared_task
import redis
pool = redis.ConnectionPool(host='XX.XXX.XXX.X', port=6379, db=0, password='XXXXX')
@shared_task
def insert_into_homefeed(photo_id, user_id):
# Grab the list of all follower IDs from Redis for user_id.
r_server = redis.Redis(connection_pool=pool)
followers_list = r_server.lrange("followers:%s" % (user_id), 0, -1)
# Now for each follower_id in followers_list, find their homefeed key
# in Redis and insert the photo_id into that homefeed list.
for follower_id in followers_list:
homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id)
return "Fan Out Completed for %s" % (user_id)
在此任务中,当从 Django View 调用时,它将获取所有关注用户 343 的人的 ID,然后将照片 ID 插入到他们所有的主页列表中。
这是我在 proj/mydjangoapp/views.py 中的上传 View 。我基本上调用 celery 的 delay 方法并传递必要的变量,以便请求快速结束:
# Import the Celery Task Here
from mydjangoapp.tasks import insert_into_homefeed
@csrf_exempt
def Upload(request):
if request.method == 'POST':
data = json.loads(request.body)
newPhoto = Photo.objects.create(user_id = data['user_id'], description= data['description'], photo_url = data['photo_url'])
newPhoto_ID = newPhoto.pk
insert_into_homefeed.delay(newPhoto_ID, data['user_id'])
return HttpResponse("Request Completed")
我怎样才能按 10,000 个批处理?
最佳答案
视频中描述的方法是任务“链接”。
为了让您的任务方法作为一个链启动并运行,您需要添加一个额外的参数来代表关注者列表中的索引。该任务不是处理完整的关注者列表,而是只处理固定的批量大小,从传递给它的索引参数开始。完成时,任务应创建一个新任务并传递新索引。
INSERT_INTO_HOMEFEED_BATCH = 10000
@shared_task
def insert_into_homefeed(photo_id, user_id, index=0):
# Grab the list of all follower IDs from Redis for user_id.
r_server = redis.Redis(connection_pool=pool)
range_limit = index + INSERT_INTO_HOMEFEED_BATCH - 1 # adjust for zero-index
followers_list_batch = r_server.lrange("followers:%s" % (user_id), index, range_limit)
if not followers_list_batch:
return # zero followers or no more batches
# Now for each follower_id in followers_list_batch, find their homefeed key
# in Redis and insert the photo_id into that homefeed list.
for follower_id in followers_list:
homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id)
insert_into_homefeed.delay(photo_id, user_id, range_limit + 1)
这很有效,因为 Redis lists are ordered和 lrange 命令 doesn't return an error on out-of-range inputs .
关于python - Django、 celery 、Redis、RabbitMQ : Chained Tasks for Fanout-On-Writes,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21007096/
我正在处理一组标记为 160 个组的 173k 点。我想通过合并最接近的(到 9 或 10 个组)来减少组/集群的数量。我搜索过 sklearn 或类似的库,但没有成功。 我猜它只是通过 knn 聚类
我有一个扁平数字列表,这些数字逻辑上以 3 为一组,其中每个三元组是 (number, __ignored, flag[0 or 1]),例如: [7,56,1, 8,0,0, 2,0,0, 6,1,
我正在使用 pipenv 来管理我的包。我想编写一个 python 脚本来调用另一个使用不同虚拟环境(VE)的 python 脚本。 如何运行使用 VE1 的 python 脚本 1 并调用另一个 p
假设我有一个文件 script.py 位于 path = "foo/bar/script.py"。我正在寻找一种在 Python 中通过函数 execute_script() 从我的主要 Python
这听起来像是谜语或笑话,但实际上我还没有找到这个问题的答案。 问题到底是什么? 我想运行 2 个脚本。在第一个脚本中,我调用另一个脚本,但我希望它们继续并行,而不是在两个单独的线程中。主要是我不希望第
我有一个带有 python 2.5.5 的软件。我想发送一个命令,该命令将在 python 2.7.5 中启动一个脚本,然后继续执行该脚本。 我试过用 #!python2.7.5 和http://re
我在 python 命令行(使用 python 2.7)中,并尝试运行 Python 脚本。我的操作系统是 Windows 7。我已将我的目录设置为包含我所有脚本的文件夹,使用: os.chdir("
剧透:部分解决(见最后)。 以下是使用 Python 嵌入的代码示例: #include int main(int argc, char** argv) { Py_SetPythonHome
假设我有以下列表,对应于及时的股票价格: prices = [1, 3, 7, 10, 9, 8, 5, 3, 6, 8, 12, 9, 6, 10, 13, 8, 4, 11] 我想确定以下总体上最
所以我试图在选择某个单选按钮时更改此框架的背景。 我的框架位于一个类中,并且单选按钮的功能位于该类之外。 (这样我就可以在所有其他框架上调用它们。) 问题是每当我选择单选按钮时都会出现以下错误: co
我正在尝试将字符串与 python 中的正则表达式进行比较,如下所示, #!/usr/bin/env python3 import re str1 = "Expecting property name
考虑以下原型(prototype) Boost.Python 模块,该模块从单独的 C++ 头文件中引入类“D”。 /* file: a/b.cpp */ BOOST_PYTHON_MODULE(c)
如何编写一个程序来“识别函数调用的行号?” python 检查模块提供了定位行号的选项,但是, def di(): return inspect.currentframe().f_back.f_l
我已经使用 macports 安装了 Python 2.7,并且由于我的 $PATH 变量,这就是我输入 $ python 时得到的变量。然而,virtualenv 默认使用 Python 2.6,除
我只想问如何加快 python 上的 re.search 速度。 我有一个很长的字符串行,长度为 176861(即带有一些符号的字母数字字符),我使用此函数测试了该行以进行研究: def getExe
list1= [u'%app%%General%%Council%', u'%people%', u'%people%%Regional%%Council%%Mandate%', u'%ppp%%Ge
这个问题在这里已经有了答案: Is it Pythonic to use list comprehensions for just side effects? (7 个答案) 关闭 4 个月前。 告
我想用 Python 将两个列表组合成一个列表,方法如下: a = [1,1,1,2,2,2,3,3,3,3] b= ["Sun", "is", "bright", "June","and" ,"Ju
我正在运行带有最新 Boost 发行版 (1.55.0) 的 Mac OS X 10.8.4 (Darwin 12.4.0)。我正在按照说明 here构建包含在我的发行版中的教程 Boost-Pyth
学习 Python,我正在尝试制作一个没有任何第 3 方库的网络抓取工具,这样过程对我来说并没有简化,而且我知道我在做什么。我浏览了一些在线资源,但所有这些都让我对某些事情感到困惑。 html 看起来
我是一名优秀的程序员,十分优秀!