gpt4 book ai didi

python - Celery 作为网络发布/订阅事件

转载 作者:太空狗 更新时间:2023-10-29 18:13:01 27 4
gpt4 key购买 nike

我想设置一个网络发布/订阅事件系统,但还需要能够异步运行任务。我曾尝试让 celery 来完成繁重的工作,但我觉得我正在尝试填充一大堆东西来让它工作。

我有两台机器(输入和输出),它们都可以访问 RabbitMQ。我想让一个主程序启动一个等待输入的循环(网络摄像头检测到的运动)。我已经设置了 input_machine 启动 main.py,它启动了一个 celery 任务,该任务由 input_machine 上的工作人员监控到“输入”队列。这个任务只运行 while True 循环,直到检测到一些输入,然后它调用另一个名为('project.entered_room',什么都不做)celery 任务到“输出”队列。

与此同时,在 output_machine 上,我有一个 celery 实例正在监视“输出”队列,其中包含一个名为('project.entered_room' 的任务,它响应有人进入房间)。

因此,当在 input_machine 上检测到输入时,任务将在输出机器上运行。我可以让它工作,但遇到很多导入问题和其他令人头疼的问题。有没有更简单的方法来完成这个?我做错了吗?我是否使用了错误的工具?

我研究了许多不同的框架,包括电路和扭曲。 Twisted 非常复杂,我觉得我会用手提钻敲钉子。

最佳答案

我建议跳过 Celery 并直接使用 Redis及其发布/订阅功能。例如,您可以通过运行 Docker image 来启动 Redis。 .然后在您的输入机器上,当检测到某些东西时,您将消息发布到一个 channel 。在您的输出机器上,您订阅该 channel 并对事件采取行动。

例如你的输入机器可以使用这样的东西:

import redis

def publish(message):
r = redis.Redis(host="redis")
r.publish("test-channel", message)

然后在输出端:

import time
import redis

def main():
r = redis.Redis(host="redis", decode_responses=True)
p = r.pubsub(ignore_subscribe_messages=True)
p.subscribe("test-channel")

while True:
message = p.get_message()
if message:
print(message.get("data", ""))
# Do more things...
time.sleep(0.001)

通过这种方式,您可以在输入和输出机器之间发送纯文本或 JSON 数据。

在这里找到一个示例实现:https://github.com/moritz-biersack/simple-async-pub-sub

关于python - Celery 作为网络发布/订阅事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31063539/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com