gpt4 book ai didi

python - 如何在 Heroku 中的 Django 中安排单次事件?

转载 作者:行者123 更新时间:2023-12-04 20:35:28 24 4
gpt4 key购买 nike

我有一个关于在 Heroku 的分布式环境中安排将来触发一次的事件所必需的软件设计的问题。

我认为写我想要实现的东西会更好,但我确实做过研究,即使经过两个小时的工作,我自己也无法弄清楚。

假设在我的 views.py我有一个功能:

def after_6_hours():
print('6 hours passed.')


def create_game():
print('Game created')
# of course time will be error, but that's just an example
scheduler.do(after_6_hours, time=now + 6)

所以我想要实现的是能够运行 after_6_hourscreate_game 之后正好 6 小时运行已被调用。现在,如您所见,这个函数是在通常的 clock.py 中定义的。或 task.py或等等等等文件。

现在,我怎样才能让我的整个应用程序一直在 Heroku 中运行,并且能够将这个作业添加到这个想象中的队列中 - scheduler图书馆?

附带说明一下,我不能使用 Heroku 的 Temporizer 插件。 APScheduler 和 Python rq 的组合看起来很有希望,但示例很简单,都安排在 clock.py 内的同一个文件中,而我只是不知道如何将所有内容与我拥有的设置联系在一起。提前致谢!

最佳答案

在 Heroku 中,您可以让您的 Django 应用程序在 Web Dyno 中运行。 ,它将负责为您的应用程序提供服务并安排任务。
例如(请注意,我没有测试运行代码):

创建 after_hours.py ,它将具有您要安排的功能(请注意,我们也将在 worker 中使用相同的源代码)。

def after_6_hours():
print('6 hours passed.')

在您的 views.py使用 rq (请注意,仅 rq 在您的情况下是不够的,因为您必须安排任务)和 rq-scheduler :
from redis import Redis
from rq_scheduler import Scheduler
from datetime import timedelta

from after_hours import after_6_hours

def create_game():
print('Game created')

scheduler = Scheduler(connection=Redis()) # Get a scheduler for the "default" queue
scheduler.enqueue_in(timedelta(hours=6), after_6_hours) #schedules the job to run 6 hours later.

调用 create_game()应该安排 after_6_hours() 在 6 小时后运行。

提示:您可以配置 Redis在 Heroku 中使用 Redis To Go 添加在。

下一步是运行 rqscheduler 工具,它每分钟轮询 Redis 以查看当时是否有任何作业要执行并将其放入队列中( rq 工作人员将监听该队列)。

现在,在 Worker Dyno创建文件 after_hours.py
def after_6_hours():
print('6 hours passed.')
#Better return something

并创建另一个文件 worker.py :
import os

import redis
from rq import Worker, Queue, Connection

from after_hours import after_6_hours

listen = ['high', 'default', 'low'] # while scheduling the task in views.py we sent it to default

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
with Connection(conn):
worker = Worker(map(Queue, listen))
worker.work()

并运行 worker.py
python worker.py

这应该在 Worker Dyno 中运行计划任务(在本例中为 afer_6_hours)。
请注意,这里的关键是让 worker 也可以使用相同的源代码(在本例中为 after_hours.py)。 rq 中也强调了这一点。 docs

Make sure that the worker and the work generator share exactly the same source code.



如果有帮助, docs 中有提示处理不同的代码库。

For cases where the web process doesn't have access to the source code running in the worker (i.e. code base X invokes a delayed function from code base Y), you can pass the function as a string reference, too.

 q = Queue('low', connection=redis_conn)
q.enqueue('my_package.my_module.my_func', 3, 4)


希望 rq-scheduler也尊重这种传递字符串而不是函数对象的方式。

你可以使用任何模块/调度工具(Celery/RabbitMQ、APScheduler 等),只要你了解这个东西。

关于python - 如何在 Heroku 中的 Django 中安排单次事件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36635755/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com