gpt4 book ai didi

python - 从 Django 应用程序触发分布式异步处理的良好架构是什么?

转载 作者:塔克拉玛干 更新时间:2023-11-02 23:10:07 24 4
gpt4 key购买 nike

我打算构建一个提供 REST api 的 Django 应用程序来添加一些处理作业。

对于每个请求,都会将一个作业添加到数据库中以供稍后处理。

作业可以通过自定义命令处理(./manage.py runJobs)

这些工作是 CPU 密集型的,所以如果我在有 4 个内核的机器上运行,我希望最多 4 个工作人员并行运行作业。

如果我需要扩展,我需要能够在不同的机器上运行更多的操作系统。

处理作业会占用大量内存,因此最好为每个作业创建一个新进程 - 也就是说 - 我认为 ./manage.py runJobs 运行 一个更好作业 和退出,而不是在同一进程中运行许多作业。

我需要确保竞争条件不会导致两名 worker 获得相同的工作。

我对 linux 进程之间的通信不是很熟悉,所以我认为来这里获取提示是个好主意。

您建议使用哪种架构来解决这个问题?

最佳答案

我想我找到了一个足够好的解决方案,它允许我将数据库用作信号量,并且不需要我在服务器上安装额外的软件。

它是这样的:

------ 自定义命令------

from django.db import transaction

def run_one_job():
candidate_jobs = Job.objects.filter(status='PENDING')
job = lock_one_job_from_list(candidate_jobs)
if job:
process(job) # whatever that means
job.status = 'DONE'
job.save()

def lock_one_job_from_list(jobs):
for job in jobs:
_job = attempt_lock_job(job.id)
if _job:
return _job


@transaction.commit_on_success
def attempt_lock_job(id):
j = Job.objects.select_for_update(id=id)[0] # wait until you get a write lock for that record
if j.status == 'PENDING':
j.status = 'RUNNING'
j.save()
return j

------ worker.sh --------

while [ 1 ]
do
./manage.py runJobs
sleep 1
done

然后我可以生成与可用核心一样多的 worker.sh 实例。这并不理想,因为每秒都会有一堆工作人员轮询数据库,但它确实解决了最大的风险:将同一个作业运行两次。我想我会接受这个 :-)。

如果您发现此方法有任何漏洞,请告诉我。

关于python - 从 Django 应用程序触发分布式异步处理的良好架构是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19007100/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com