gpt4 book ai didi

python - Celery 正确的任务组链

转载 作者:行者123 更新时间:2023-12-03 20:40:38 24 4
gpt4 key购买 nike

我有一个字符串列表列表,它们是文件名:

chunks_list = [["file_1", "file_2"], ["file_3", "file_4", "file_5"], ...]
我需要在 celery 任务中处理这些文件,所以我有一个任务:
@celery_app.task
def process_file_task(filename):
# do some staff with file
# (e.g. produce data to Kafka)
我需要 并行运行内部文件列表 .和 外部列表应该是连续的 .
接下来的处理方式应该是:
worker1: file_1
worker2: file_2
第一组完成后,下一组开始:
worker1: file_3
worker2: file_4
worker3: file_5
我尝试以这种方式运行我的任务:
sequence_tasks = []
for chunks in chunks_list:
sequence_tasks.append(
group([process_file_task.si(filename) for filename in chunks])
)

tasks_chain = chain(
tasks_group for tasks_group in sequence_tasks
)()
tasks_chain.get()
问题是我可能有数千个文件。而且这种运行任务的方式消耗了我所有的空闲内存,即使只有数百个。
请告诉我如何以正确的方式设计我的工作流程,以免内存不足。

最佳答案

你没有提到设置细节:

  • 你的 worker 在哪里跑? docker ? (k8s/ecs/..) ec2?
  • 什么 worker concurrency ?
  • 每个文件的任务(在同一组内)都必须在单独的工作人员上运行吗?

  • 总是有一个大写 - 你有多少 worker 的限制。
    假设您有 10 个工作人员,每个工作人员在一个组中并发 1 和 1,000 个任务。它们将几乎并行运行(因为您没有 1,000 名 worker ,只有 10 名),但这很好。 Celery 将确保在完成所有前 1,000 个任务之前,不会启动第二组中的任何任务。如果运行每个任务所需的时间相等,则每个工作人员将处理 100 个任务,然后将移动到下一个块。如果花费的时间不同,Celery 将通过将下一个任务分配给下一个空闲 worker 来进行优化。
    换句话说,您知道设置是什么(假设具有 16GB 和 8 个内核的 EC2 实例)。您还知道(我希望)单个任务可以占用的最大内存大小是多少。如果平均任务需要 1GB RAM,您最多可以运行 16 个并行度为 1 的任务或 6 个并行度为 2 的任务等)

    关于python - Celery 正确的任务组链,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67244229/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com