gpt4 book ai didi

python-3.x - Celery 是否设计用于运行在将数据推送到 RabbitMQ 队列并使用时执行的任务?

转载 作者:行者123 更新时间:2023-12-05 04:52:18 25 4
gpt4 key购买 nike

架构

我计划通过 MQTT 将 IoT 节点的数据发布到 RabbitMQ 队列中。然后处理数据,需要将状态保存到 Redis 中。

RabitMQTT Architecture

当前实现

我为 RabbitMQ 启动了一个 docker 容器并将其配置为启用 MQTT(端口:1883)。

基于 RabbitMQ's MQTT Plugin Documentation

  • 来自 MQTT 端口的数据被发送到 amq.topic Exchange 并订阅类似于 MQTT 主题的队列名称,其中 / 被替换为 例如hello/test MQTT 主题 -> hello.test RabbitMQ 队列。

通过 AMQP 端口进行基本消费

使用pika 的简单示例如下,完美运行

import argparse, sys, pika
def main():

args = parse_arguments()

# CLI TAKES IN BROKER PARAMETERS in `args`
# Removed for brevity
broker_credentials = pika.PlainCredentials(args.rabbit_user, args.rabbit_pass)


print("Setting Up Connection with RabbitMQ Broker")

connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=args.rabbit_host,
port=args.rabbit_port,
credentials=broker_credentials
)
)

# Create Channel
print("Creating Channel")

channel = connection.channel()

# Declare the Exchange

print("Declaring the exchange to read incoming MQTT Messages")
# Exchange to read mqtt via RabbitMQ is always `amq.topic`
# Type of exchange is `topic` based
channel.exchange_declare(exchange='amq.topic', exchange_type='topic', durable=True)

# the Queue Name for MQTT Messages is the MQTT-TOPIC name where `/` is replaced by `.`
# Let RabbitMQ create the name for us
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# Bind the Queue with some specific topic where you wish to get the data from

channel.queue_bind(exchange='amq.topic', queue=queue_name, routing_key=args.rabbit_topic)

# Start Consuming from Queue

channel.basic_consume(queue=queue_name, on_message_callback=consumer_callback, auto_ack=True)

print("Waiting for MQTT Payload")

channel.start_consuming()




if __name__ == "__main__":

try:
main()
except KeyboardInterrupt:
print("CTRL+C Pressed")
sys.exit(0)

要求

我只发现了 Celery 并且正在研究它。在很多例子中,通常是外部脚本触发任务,然后工作人员解决任务并将其保存到 backend(在我的例子中是 Redis)

例如

app = Celery('tasks', broker='RABBITMQ_BROKER_URL')

@app.task

def process_iot_data(incoming_data):
time.sleep(1.0)
# Do Some Necessary data processing and store the processed state in Redis

Celery 设计中的困惑

我浏览了很多博客,其中 Celery 任务与 REST API 一起使用,并且在调用 API 时,任务被排队并执行,状态被保存在后端。

我找不到任何示例,在初始化 Celery(..) 应用程序期间,我可以实例化必要的 exchangeamq.topic 以及我通过上面的消费者代码使用 pika 完成的事情。

当推送 RabbitMQ 队列中的数据时,我无法意识到在任务排队的情况下使用 celery 的可能方式是什么。与发送 REST API 请求不同,我希望在相应队列中插入数据时使用 celery 任务处理队列中的传入数据。

这是否可以通过 Celery 实现,还是我应该坚持使用 pika 并在回调函数中编写内容?

目标

我希望做一些模拟,在其中我可以将消费者扩大许多倍,并尝试查看我的 dockerized 消费者应用程序能够承受多大的数据和处理量。

最佳答案

简而言之——没有。

Celery 不是为处理发送到消息队列系统的任意数据而设计的。它旨在生产/消费包含序列化 Celery 任务详细信息的消息,以便消费者可以在另一端执行特定任务,并将结果放入结果后端。

但是,我坚信几乎所有您能想到的任意消息都可以(以这种方式或另一种方式)包装到 Celery 任务中。但真正的问题是当您不希望 Celery 出现在其中一端(生产者或消费者)时。生产者可以使用方便的 send_task() 函数发送任务,而无需共享包含任务定义的代码。

关于python-3.x - Celery 是否设计用于运行在将数据推送到 RabbitMQ 队列并使用时执行的任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66573136/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com