gpt4 book ai didi

python - 在 Django 中向长时间运行的方法发送信号

转载 作者:行者123 更新时间:2023-12-04 05:02:21 30 4
gpt4 key购买 nike

我想向 Celery 中的一个长时间运行的任务发送一个“暂停”信号,我试图找出最好的方法来做到这一点。在 View 中,我可以从数据库中提取对象的实例并告诉它进行保存,但这与 Celery 中的对象实例不同。该对象不会检查它是否已暂停。

从长期运行的类和任务中轮询数据库感觉很奇怪和不切实际,所以我正在考虑向我的实例发送一条消息。我考虑过使用 pubsub,但我更喜欢使用 Django 信号,因为它已经是一个 Django 项目。我可能以错误的方式接近这个。

这是一个不起作用的例子:

模型.py

class LongRunningClass(models.Model):
is_paused = models.BooleanField(default=False)
processed_files = models.IntegerField(default=0)
total_files = models.IntegerField(default=100)

def long_task(self):
remaining_files = self.total_files - self.processed_files
for i in xrange(remaining_files):
if not self.is_paused:
self.processed_files += 1
time.sleep(1)

# Task complete, let's save.
self.save()

View .py
def pause_task(self, pk):
lrc = LongRunningClass.objects.get(pk=pk)
lrc.is_paused = True
lrc.save()
return HttpResponse(json.dumps({'is_paused': lrc.is_paused}))


def resume_task(self, pk):
lrc = LongRunningClass.objects.get(pk=pk)
lrc.is_paused = False
lrc.save()

# Pretend this is a Celery task
lrc.long_task()

因此,如果我修改 models.py 以使用信号,我可以添加这些行,但它仍然无法正常工作:
pause_signal = django.dispatch.Signal(providing_args=['is_paused'])

@django.dispatch.receiver(pause_signal)
def pause_callback(sender, **kwargs):
if 'is_paused' in kwargs:
sender.is_paused = kwargs['is_paused']
sender.save()

这也不影响已经运行的实例化类。如何告诉在任务中运行的模型实例暂停?

最佳答案

Celery 任务是一个单独的进程。 Django 信号是标准的“观察者”模式,它在一个线程中工作,因此无法使用信号组织线程之间的通信。您需要从数据库加载对象以了解其属性是否已更改。

class LongRunningClass(models.Model):
is_paused = models.BooleanField(default=False)
processed_files = models.IntegerField(default=0)
total_files = models.IntegerField(default=100)

def get_is_paused(self):
db_obj = LongRunningClass.objects.get(pk=self.pk)
return db_obj.is_paused

def long_task(self):
remaining_files = self.total_files - self.processed_files
for i in xrange(remaining_files):
if not self.get_is_paused:
self.processed_files += 1
time.sleep(1)

# Task complete, let's save.
self.save()

设计不太好 - 你最好搬家 long_task到其他地方,用新加载的 LongRunningClass 操作例如,但它会完成这项工作。您可以在此处添加一些内存缓存 - 如果您不想经常打扰您的数据库。

顺便说一句:我不是 100% 肯定,但你可能在这里有另一个设计问题。当您使用这种周期运行非常长的任务时,这种情况非常罕见。考虑从您的程序中删除循环(您有队列!)。看一看:
@celery.task(run_every=2minutes)  # adding XX files for processing every XX minutes
def scheduled_task(lr_pk):
lr = LongRunningClass.objects.get(pk=lr_pk)
if not lr.is paused:
remaining_files = self.total_files - self.processed_files
for i in xrange(lr.files_per_iteration):
process_file.delay(lr.pk,i)

@celery.task(rate=1/m,queue='process_file') # processing each file
def process_file(lr_pk,i):
# do somthing with i
lr = LongRunningClass.objects.get(pk=lr_pk)
lr.processed_files += 1
lr.save()

您必须设置 celerybeat,并为此类任务创建单独的队列,以实现此解决方案。但因此,您将对您的程序有很多控制权 - 速度、并行执行和您的代码不会因为 sleep(1) 而挂起。 .如果为每个文件创建另一个模型,则可以控制处理哪些文件,不处理哪些文件,处理错误等。

关于python - 在 Django 中向长时间运行的方法发送信号,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15982080/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com