gpt4 book ai didi

django - apply_async 后 Celery chord 不释放 redis pubsub channel

转载 作者:可可西里 更新时间:2023-11-01 11:46:07 25 4
gpt4 key购买 nike

我正在从我的 django 应用程序中的 celery 启动一个 chord 以响应请求。和弦正确执行,但 django 从未发布 pub-sub channel 。杀死 django 服务器释放 channel ,然后它从 redis-cli pubsub channels 中消失。

  • celery 4.1.1 或 4.2.0rc4
  • Redis 4.0.9
  • python 2.7.15
  • 在本地运行,1 个 celery worker,1 个 api 服务器
  • 在这种情况下结果并不重要(但文档说不要忽略它们)
  • 完整示例项目位于:https://github.com/awbacker/celerychord-issue

点击 /api/start/ 并在运行 celery 的选项卡中观察任务完成后,我看到剩余 5 个 channel 。杀死 django 会移除 channel ,杀死 celery worker 对它们没有影响。

redis-cli pubsub channels
1) "celery-task-meta-chord-lphsmq-chunk-4-14"
2) "celery-task-meta-chord-lphsmq-chunk-2-12"
3) "celery-task-meta-chord-lphsmq-chunk-3-13"
4) "celery-task-meta-chord-lphsmq-chunk-1-11"
5) "celery-task-meta-chord-lphsmq-chunk-0-10"

我看到当一切正常时 channel 仍然存在,因此不会抛出任何错误。

谁能看出我做错了什么?我知道 celery 中报告了一些问题,但我不确定这是否来自于它们:

代码:

# --- endpoint.py -------------------------------------------
chord_key = get_random_string(6, string.ascii_lowercase)
all_tasks = celery.chord(
task_id="chord-%s" % chord_key,
header=celery.group(
tasks.process_chunk.subtask(args=(x,), task_id="chord-%s-chunk-%s-%s" % (chord_key, i, x))
for i, x in enumerate(range(10, 15))
),
# immutable = ignore results from parent
body=celery.chain(
tasks.post_step_1.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
tasks.post_step_2.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
)
)
result = all_tasks.apply_async()
return Response(data=dict(chord_key=chord_key, result=repr(result)))

# --- tasks.py ----------------------------------------------
@celery_app.task(bind=True, ignore_result=False)
def process_chunk(self, x):
logging.error(" ~ executing process-chunk: %s" % x)
return x * 2


@celery_app.task(bind=True, ignore_result=False)
def post_step_1(self, y):
logging.error(" ~ executing post-step-1")
return y * 3


@celery_app.task(bind=True, ignore_result=False)
def post_step_2(self, z):
logging.error(" ~ executing post-step-2")
return z * 5

最佳答案

你的 Chord 看起来很复杂,也许这就是 celery 遇到困难的原因,我建议你自己实现和弦逻辑,它不是很复杂。试试这个……我基本上是在等待使用和弦机制的任务

# --- endpoint.py ------------------------------------------- 
chain_tasks = celery.chain(
tasks.post_step_1.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
tasks.post_step_2.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True)
.apply_async()
chain_result= chain_tasks.get() // WAIT TO FINISH

group_task = celery.group(tasks.process_chunk.subtask(args=(chain_result,), task_id="chord-%s-chunk-%s-%s" % (chord_key, i, x))
for i, x in enumerate(range(10, 15)).apply_async()
group_result = group_task.get()
return Response(data=dict(chord_key=chord_key, result=repr(group_result)))

不确定这是否正是您想要实现的目标,但我认为通过一些调整它会起作用。祝你好运。

关于django - apply_async 后 Celery chord 不释放 redis pubsub channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50617142/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com