gpt4 book ai didi

python - 根据 celery 的结果路由到 worker ?

转载 作者:太空狗 更新时间:2023-10-30 03:03:30 30 4
gpt4 key购买 nike

我一直在使用 Storm最近,它包含一个名为 fields grouping 的概念(与 Celery 中的 group() 概念无关),其中具有特定键的消息将始终路由到同一个工作人员。

为了更清楚地理解我的意思,这里是来自 Storm wiki。

Fields grouping: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id" field, tuples with the same "user-id" will always go to the same task, but tuples with different "user-id"'s may go to different tasks.

例如,从单词列表中读取,我想将以 a、b、c 开头的单词仅路由到 工作进程,将 d、e、f 路由到另一个,等等。

想要这样做的一个原因可能是因为我希望一个进程负责一组相同数据的数据库读/写,这样进程之间就不会出现竞争条件。

我正在尝试找出在 Celery 中实现此目标的最佳方法。

到目前为止,我最好的解决方案是为每个“组”(例如 letters.a、letters.d)使用一个队列,并确保工作进程的数量与队列的数量完全匹配。缺点是每个 worker 只能运行一个进程,以及各种情况,例如 worker 死亡或添加/删除 worker 。

我是 Celery 的新手,所以如果我提到的概念不正确,请纠正我。

最佳答案

这里涉及到一些胶水,但这是概念:

有一种方法可以使用CELERY_WORKER_DIRECT 将任务直接发送给不同的工作人员。 .将其设置为 True 会为每个工作人员创建一条路线。

我使用 celery.current_app.control.inspect().ping() 定期确定活跃的工作人员或确定活跃的主机。例如:

>>> hosts = sorted(celery.current_app.control.inspect().ping().keys())
['host5', 'host6']

当我需要通过键进行路由时,我会对值进行哈希处理,然后对 worker 数量取模。这将平均分配任务,并将相同的 key 交给同一个 worker 。例如:

>>> host_id = hash('hello') % len(hosts)
1
>>> host = hosts[host_id]
'host6'

然后在执行任务时,我只需像这样指定交换和路由键:

my_task.apply_async(exchange='C.dq', routing_key=host)

有一些缺点:

  1. 据我所知,将一个工作线程的并发设置为 > 1 将使每个进程都使用同一个进程,从而否定整个练习。不幸的解决方法是将其保持在 1。
  2. 如果 worker 在 ping()apply_async 之间发生故障,消息将发送到不存在的路由。解决此问题的方法是捕获超时、重新断言可用主机、重新散列并重新发送。

关于python - 根据 celery 的结果路由到 worker ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18705833/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com