gpt4 book ai didi

python - 如何在 python-rq 中的计划作业和排队作业之间创建 `` dependent_on`` 关系

转载 作者:行者123 更新时间:2023-12-01 18:09:47 26 4
gpt4 key购买 nike

我有一个 Web 服务(Python 3.7、Flask 1.0.2),其工作流程包含 3 个步骤:

  • 第 1 步:向商业排队系统(IBM 的 LSF)提交远程计算作业
  • 第 2 步:每 61 秒轮询一次远程计算作业状态(由于缓存了作业状态结果,因此需要 61 秒)
  • 第 3 步:如果第 2 步返回远程计算作业状态 ==“DONE”,则进行数据后处理

远程计算作业的长度是任意的(秒到天之间),每个步骤都取决于前一个步骤的完成情况:

with Connection(redis.from_url(current_app.config['REDIS_URL'])):
q = Queue()
job1 = q.enqueue(step1)
job2 = q.enqueue(step2, depends_on=job1)
job3 = q.enqueue(step3, depends_on=job2)

但是,最终所有工作人员(4 个工作人员)都会进行轮询(4 个客户端请求中的第 2 步),同时他们应该继续执行其他传入请求的第 1 步以及已成功通过第 2 步的工作流程的第 3 步。

每次投票后应释放工作人员。他们应该定期返回步骤 2 进行下一次轮询(每个作业最多每 61 秒一次),并且如果远程计算作业轮询未返回“DONE”,则重新对轮询作业进行排队。

<小时/>

此时我开始使用rq-scheduler(因为间隔和重新排队功能听起来很有希望):

with Connection(redis.from_url(current_app.config['REDIS_URL'])):
q = Queue()
s = Scheduler('default')

job1 = q.enqueue(step1, REQ_ID)

job2 = Job.create(step2, (REQ_ID,), depends_on=job1)
job2.meta['interval'] = 61
job2.origin = 'default'
job2.save()
s.enqueue_job(job2)

job3 = q.enqueue(step3, REQ_ID, depends_on=job2)

Job2 已正确创建(包括与 job1 的 depends_on 关系,但 s.enqueue_job() 立即执行它,忽略其与 job1 的关系。(q.enqueue_job() 的函数文档字符串)实际上说的是立即执行...)。

当 job2 放入调度程序而不是队列时,如何在 job1、job2 和 job3 之间创建 depends_on 关系? (或者,如何将 job2 交给调度程序,而不是立即执行 job2 并等待 job1 完成?)

<小时/>

出于测试目的,步骤如下所示:

def step1():
print(f'*** --> [{datetime.utcnow()}] JOB [ 1 ] STARTED...', flush=True)
time.sleep(20)
print(f' <-- [{datetime.utcnow()}] JOB [ 1 ] FINISHED', flush=True)
return True

def step2():
print(f' --> [{datetime.utcnow()}] POLL JOB [ 2 ] STARTED...', flush=True)
time.sleep(10)
print(f' <-- [{datetime.utcnow()}] POLL JOB [ 2 ] FINISHED', flush=True)
return True

def step3():
print(f' --> [{datetime.utcnow()}] JOB [ 3 ] STARTED...', flush=True)
time.sleep(10)
print(f'*** <-- [{datetime.utcnow()}] JOB [ 3 ] FINISHED', flush=True)
return True

我收到的输出是这样的:

worker_1     | 14:44:57 default: project.server.main.tasks.step1(1) (d40256a2-904f-4ce3-98da-6e49b5d370c9)
worker_2 | 14:44:57 default: project.server.main.tasks.step2(1) (3736909c-f05d-4160-9a76-01bb1b18db58)
worker_2 | --> [2019-11-04 14:44:57.341133] POLL JOB [ 2 ] STARTED...
worker_1 | *** --> [2019-11-04 14:44:57.342142] JOB [ 1 ] STARTED...
...

job2 没有等待 job1 完成...

<小时/>
#requirements.txt
Flask==1.0.2
Flask-Bootstrap==3.3.7.1
Flask-Testing==0.7.1
Flask-WTF==0.14.2
redis==3.3.11
rq==0.13
rq_scheduler==0.9.1

最佳答案

我对此问题的解决方案仅使用rq(并且不再使用rq_scheduler):

  1. 升级到最新的 python-rq 包:

    # requirements.txt
    ...
    rq==1.1.0
  2. 为轮询作业创建专用队列,并相应地将作业排入队列(使用 depends_on 关系):

    with Connection(redis.from_url(current_app.config['REDIS_URL'])):
    q = Queue('default')
    p = Queue('pqueue')
    job1 = q.enqueue(step1)
    job2 = p.enqueue(step2, depends_on=job1) # step2 enqueued in polling queue
    job3 = q.enqueue(step3, depends_on=job2)
  3. 为轮询队列派生专用工作线程。它继承自标准 Worker 类:

    class PWorker(rq.worker.Worker):
    def execute_job(self, *args, **kwargs):
    seconds_between_polls = 65
    job = args[0]
    if 'lastpoll' in job.meta:
    job_timedelta = (datetime.utcnow() - job.meta["lastpoll"]).total_seconds()
    if job_timedelta < seconds_between_polls:
    sleep_period = seconds_between_polls - job_timedelta
    time.sleep(sleep_period)
    job.meta['lastpoll'] = datetime.utcnow()
    job.save_meta()

    super().execute_job(*args, **kwargs)

    PWorker 通过向作业的元数据'lastpoll' 添加时间戳来扩展 execute_job 方法。

    如果轮询作业进来,并且具有 lastpoll 时间戳,工作线程会检查自 lastpoll 以来的时间段是否大于 65 秒。如果是,它将当前时间写入'lastpoll'并执行轮询。如果没有,它会休眠直到 65 秒结束,然后将当前时间写入 'lastpoll' 并执行轮询。没有 lastpoll 时间戳的作业是第一次轮询,工作线程创建时间戳并执行轮询。

  4. 创建一个专用异常(由任务函数抛出)和一个异常处理程序来处理它:

    # exceptions.py

    class PACError(Exception):
    pass

    class PACJobRun(PACError):
    pass

    class PACJobExit(PACError):
    pass
    # exception_handlers.py

    def poll_exc_handler(job, exc_type, exc_value, traceback):
    if exc_type is PACJobRun:
    requeue_job(job.get_id(), connection=job.connection)
    return False # no further exception handling
    else:
    return True # further exception handling
    # tasks.py

    def step2():
    # GET request to remote compute job portal API for status
    # if response == "RUN":
    raise PACJobRun
    return True

    当自定义异常处理程序捕获自定义异常(这意味着远程计算作业仍在运行)时,它会在轮询队列中重新排队该作业。

  5. 将自定义异常处理程序放入异常处理层次结构中:

    # manage.py

    @cli.command('run_pworker')
    def run_pworker():
    redis_url = app.config['REDIS_URL']
    redis_connection = redis.from_url(redis_url)
    with rq.connections.Connection(redis_connection):
    pworker = PWorker(app.config['PQUEUE'], exception_handlers=[poll_exc_handler])
    pworker.work()

这个解决方案的好处是它只用几行额外代码就扩展了 python-rq 的标准功能。另一方面,额外的队列和工作线程会增加复杂性……

关于python - 如何在 python-rq 中的计划作业和排队作业之间创建 `` dependent_on`` 关系,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58695844/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com