gpt4 book ai didi

python - 如果数据库发生变化,如何停止执行长进程?

转载 作者:行者123 更新时间:2023-12-03 17:09:02 35 4
gpt4 key购买 nike

我有一个向 RabbitMQ 发送消息的 View 队列。

message = {'origin': 'Bytes CSV',
'data': {'csv_key': str(csv_entry.key),
'csv_fields': csv_fields
'order_by': order_by,
'filters': filters}}

...

queue_service.send(message=message, headers={}, exchange_name=EXCHANGE_IN_NAME,
routing_key=MESSAGES_ROUTING_KEY.replace('#', 'bytes_counting.create'))
对于我的消费者,我有一个很长的过程来生成 CSV。
def create(self, data):
csv_obj = self._get_object(key=data['csv_key'])
if csv_obj.status == CSVRequestStatus.CANCELED:
self.logger.info(f'CSV {csv_obj.key} was canceled by the user')
return

result = self.generate_result_data(filters=data['filters'], order_by=data['order_by'], csv_obj=csv_obj)
csv_data = self._generate_csv(result=result, csv_fields=data['csv_fields'], csv_obj=csv_obj)
file_key = self._post_csv(csv_data=csv_data, csv_obj=csv_obj)

csv_obj.status = CSVRequestStatus.READY
csv_obj.status_additional = CSVRequestStatusAdditional.SUCCESS
csv_obj.file_key = file_key
csv_obj.ready_at = timezone.now()
csv_obj.save(update_fields=['status', 'status_additional', 'ready_at', 'file_key'])

self.logger.info(f'CSV {csv_obj.name} created')
长过程发生在 self._generate_csv 内部, 因为 self.generate_result_data返回 queryset ,这是懒惰的。
如您所见,如果用户更改了 csv_request 的状态通过端点之前消息开始被使用,过程将不会被评估。我的目标是在执行 self._generate_csv 期间让这种情况发生。 .
到目前为止,我尝试使用 Threading ,但没有成功。
我怎样才能实现我的目标?
非常感谢!

最佳答案

你为什么不结帐 Celery 库?使用 celery with djangoRabbitMQ backend比直接利用rabbitmq 队列容易得多。
Celery 有一个内置函数 revoke终止正在进行的任务:

>>> from celery.task.control import revoke
>>> revoke(task_id, terminate=True)
  • related SO answer
  • celery docs

  • 对于您的用例,您可能需要类似(代码片段)的内容:
    ## celery/tasks.py
    from celery import app

    @app.task(queue="my_queue")
    def create_csv(message):
    # ...snip...
    pass

    ## main.py
    from celery import uuid, current_app

    def start_task(task_id, message):
    current_app.send_task(
    "create_csv",
    args=[message],
    task_id=task_id,
    )

    def kill_task(task_id):
    current_app.control.revoke(task_id, terminate=True)

    ## signals.py

    from django.dispatch import receiver
    from .models import MyModel
    from .main import kill_task

    # choose appropriate signal to listen for DB change
    @receiver(models.signals.post_save, sender=MyModel)
    def handler(sender, instance, **kwargs):
    kill_task(instance.task_id)
  • 使用 celery.uuid生成可以存储在数据库或缓存中的任务 ID,并使用相同的任务 ID 来控制任务,即请求终止。
  • 关于python - 如果数据库发生变化,如何停止执行长进程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67440639/

    35 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com