gpt4 book ai didi

python - 使用 celery 处理巨大的文本文件

转载 作者:太空狗 更新时间:2023-10-29 18:08:16 26 4
gpt4 key购买 nike

背景

我正在研究使用 celery (3.1.8) 来处理每个巨大的文本文件 (~30GB)。这些文件在 fastq 中格式并包含大约 118M 测序“读数”,它们基本上都是 header 、DNA 序列和质量字符串的组合)。此外,这些序列来自双端测序运行,因此我同时迭代两个文件(通过 itertools.izip)。我希望能够做的是获取每对读取,将它们发送到队列,并让它们在我们集群中的一台机器上进行处理(不管是哪台机器)以返回清理后的版本如果需要进行清洁(例如,基于质量),则读取。

我已经设置了 celery 和 rabbitmq,我的 workers 启动如下:

celery worker -A tasks --autoreload -Q transient 

配置如下:

from kombu import Queue

BROKER_URL = 'amqp://guest@godel97'
CELERY_RESULT_BACKEND = 'rpc'
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT=['pickle', 'json']
CELERY_TIMEZONE = 'America/New York'
CELERY_ENABLE_UTC = True
CELERYD_PREFETCH_MULTIPLIER = 500

CELERY_QUEUES = (
Queue('celery', routing_key='celery'),
Queue('transient', routing_key='transient',delivery_mode=1),
)

我选择使用 rpc 后端和 pickle 序列化来提高性能,也没有将任何内容写入“ transient ”队列中的磁盘(通过 delivery_mode)。

celery 启动

为了设置 celery 框架,我首先在 64 路机器上启动 rabbitmq 服务器(3.2.3,Erlang R16B03-1),将日志文件写入快速/tmp 磁盘。工作进程(如上所述)在集群上的每个节点(大约 34 个)上启动,范围从 8 路到 64 路 SMP,总共 688 个内核。因此,我有大量可用的 CPU 供工作人员用来处理队列。

作业提交/性能

celery 启动并运行后,我通过 ipython notebook 提交作业,如下所示:

files = [foo, bar]
f1 = open(files[0])
f2 = open(files[1])
res = []
count = 0
for r1, r2 in izip(FastqGeneralIterator(f1), FastqGeneralIterator(f2)):
count += 1
res.append(tasks.process_read_pair.s(r1, r2))
if count == 10000:
break
t.stop()
g = group(res)
for task in g.tasks:
task.set(queue="transient")

读取 10000 对大约需要 1.5 秒。然后,我在群上调用延迟提交给 worker ,大约需要20s,如下:

result = g.delay()

使用 rabbitmq 控制台进行监控,我发现我做的不错,但速度还不够快。

rabbitmq graph

问题

那么,有什么办法可以加快速度吗?我的意思是,我希望看到每秒至少处理 50,000 个读取对,而不是 500 个。我的 celery 配置中是否有明显遗漏的内容?我的 worker 和兔子日志基本上是空的。想要一些关于如何提高我的表现的建议。每个单独的读取对处理也非常快:

[2014-01-29 13:13:06,352: INFO/Worker-1] tasks.process_read_pair[95ec7f2f-0143-455a-a23b-c032998951b8]: HWI-ST425:143:C04A5ACXX:3:1101:13938:2894 1:N:0:ACAGTG HWI-ST425:143:C04A5ACXX:3:1101:13938:2894 2:N:0:ACAGTG 0.00840497016907 sec

到目前为止

到目前为止,我用谷歌搜索了所有我能想到的 celery 、性能、路由、rabbitmq 等。我浏览了 celery 网站和文档。如果我不能获得更高的性能,我将不得不放弃这种方法而采用另一种解决方案(基本上将工作分成许多较小的物理文件并直接在每个计算节点上使用多处理或其他方式处理它们)。但是,如果无法将此负载分散到集群中,那将是一种耻辱。另外,这似乎是一个非常优雅的解决方案。

在此先感谢您的帮助!

最佳答案

不是答案,但评论时间太长。

让我们把问题缩小一点...

首先,尝试跳过所有正常的逻辑/消息准备,只使用您当前的库执行尽可能紧凑的发布循环。看看你得到什么比率。这将确定您的非队列相关代码是否存在问题。

如果仍然很慢,设置一个新的 python 脚本但使用 amqplib而不是 celery 。我已经设法让它以超过 6000/s 的速度发布,同时在中档桌面上做有用的工作(和 json 编码),所以我知道它的性能。这将确定问题是否与 celery 库有关。 (为了节省您的时间,我从我的一个项目中截取了以下内容,希望在简化时不会破坏它...)

from amqplib import client_0_8 as amqp
try:
lConnection = amqp.Connection(
host=###,
userid=###,
password=###,
virtual_host=###,
insist=False)
lChannel = lConnection.channel()
Exchange = ###

for i in range(100000):
lMessage = amqp.Message("~130 bytes of test data..........................................................................................................")
lMessage.properties["delivery_mode"] = 2
lChannel.basic_publish(lMessage, exchange=Exchange)

lChannel.close()
lConnection.close()

except Exception as e:
#Fail

在上述两种方法之间,您应该能够将问题追查到队列、库或您的代码之一。

关于python - 使用 celery 处理巨大的文本文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21438385/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com