gpt4 book ai didi

python RQ : pattern for callback

转载 作者:太空狗 更新时间:2023-10-30 03:01:45 26 4
gpt4 key购买 nike

我现在有大量文档要处理,我正在使用 Python RQ 来并行处理任务。

我希望在每个文档上执行不同的操作时完成一系列工作。例如:A -> B -> C 表示将文档传递给函数A,在之后A 已完成,继续执行 B 和最后的 C

但是,Python RQ 似乎不能很好地支持管道内容。

这是一个简单但有点肮脏的做法。总之,管道中的每个函数都以嵌套的方式调用它的下一个函数。

例如,对于管道A->B->C

在顶层,一些代码是这样写的:

q.enqueue(A, the_doc)

其中 q 是 Queue 实例,在函数 A 中有如下代码:

q.enqueue(B, the_doc)

而在B中,有这样的东西:

q.enqueue(C, the_doc)

还有比这更优雅的方式吗?例如 ONE 函数中的一些代码:

q.enqueue(A, the_doc)
q.enqueue(B, the_doc, after = A)
q.enqueue(C, the_doc, after= B)

depends_on参数最接近我的要求,但是,运行如下:


A_job = q.enqueue(A, the_doc)
q.enqueue(B, depends_on=A_job)

不会工作。因为 q.enqueue(B, depends_on=A_job )A_job = q.enqueue(A, the_doc) 执行后立即执行。当 B 入队时,A 的结果可能还没有准备好,因为它需要时间来处理。

附言:

如果 Python RQ 不太擅长这个,我还可以使用 Python 中的什么工具来实现相同的目的:

  1. 循环并行化
  2. 管道处理支持

最佳答案

By the time B is enqueued, the result from A might not be ready as it takes time to process.

我不确定您最初发布问题时是否确实如此,但无论如何,现在情况并非如此。事实上,depends_on 功能正是为您描述的工作流程而设计的。

确实这两个函数是立即连续执行的。

A_job = q.enqueue(A, the_doc)
B_job = q.enqueue(B, depends_on=A_job )

但是在 A 完成之前,worker 不会执行 B。直到 A_job 成功执行,B.status == 'deferred'。一旦 A.status == 'finished'B 就会开始运行。

这意味着 BC 可以像这样访问和操作它们的依赖项的结果:

import time
from rq import Queue, get_current_job
from redis import StrictRedis

conn = StrictRedis()
q = Queue('high', connection=conn)

def A():
time.sleep(100)
return 'result A'

def B():
time.sleep(100)
current_job = get_current_job(conn)
a_job_id = current_job.dependencies[0].id
a_job_result = q.fetch_job(a_job_id).result
assert a_job_result == 'result A'
return a_job_result + ' result B'


def C():
time.sleep(100)
current_job = get_current_job(conn)
b_job_id = current_job.dependencies[0].id
b_job_result = q.fetch_job(b_job_id).result
assert b_job_result == 'result A result B'
return b_job_result + ' result C'

worker最终会打印'result A result B result C'

此外,如果您在队列中有很多作业并且 B 可能在执行之前等待一段时间,您可能需要显着增加 result_ttl 或使其无限期result_ttl=-1。否则,无论为 result_ttl 设置多少秒后,A 的结果都将被清除,在这种情况下,B 将无法再访问它并返回所需的结果。

但是,设置 result_ttl=-1 具有重要的内存含义。这意味着您的作业结果将永远不会被自动清除,并且内存将按比例增长,直到您手动从 Redis 中删除这些结果。

关于 python RQ : pattern for callback,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24295090/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com