gpt4 book ai didi

celery - 带有 celery 的猎鹰 python 示例

转载 作者:行者123 更新时间:2023-12-01 10:31:51 25 4
gpt4 key购买 nike

import falcon
import json
from tasks import add
from waitress import serve


class tasksresource:
def on_get(self, req, resp):
"""Handles GET requests"""
self.result = add.delay(1, 2)
self.context = {'ID': self.result.id, 'final result': self.result.ready()}
resp.body = json.dumps(self.context)



api = falcon.API()
api.add_route('/result', tasksresource())
# api.add_route('/result/task', taskresult())
if __name__ == '__main__':
serve(api, host='127.1.0.1', port=5555)

我如何从 json 有效负载(发布数据)中获取任务 ID
并为其添加路线

最佳答案

这里是一个小例子。文件结构:

/project
__init__.py
app.py # routes, falcon etc.
tasks.py # celery
example.py # script for demonstration how it works

应用程序.py:
import json

import falcon
from tasks import add
from celery.result import AsyncResult


class StartTask(object):

def on_get(self, req, resp):
# start task
task = add.delay(4, 4)
resp.status = falcon.HTTP_200
# return task_id to client
result = {'task_id': task.id}
resp.body = json.dumps(result)


class TaskStatus(object):

def on_get(self, req, resp, task_id):
# get result of task by task_id and generate content to client
task_result = AsyncResult(task_id)
result = {'status': task_result.status, 'result': task_result.result}
resp.status = falcon.HTTP_200
resp.body = json.dumps(result)


app = falcon.API()

# registration of routes
app.add_route('/start_task', StartTask())
app.add_route('/task_status/{task_id}', TaskStatus())

任务.py:
from time import sleep

import celery


app = celery.Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')


@app.task
def add(x, y):
"""
:param int x:
:param int y:
:return: int
"""
# sleep just for demonstration
sleep(5)

return x + y

现在我们需要开始 celery应用。转至 project文件夹并运行:
celery -A tasks worker --loglevel=info

之后我们需要开始 Falcon应用。转至 project文件夹并运行:
gunicorn app:app

行。全部都准备好了。
example.py是小客户端,可以帮助理解:
from time import sleep

import requests
# start new task
task_info = requests.get('http://127.0.0.1:8000/start_task')
task_info = task_info.json()

while True:
# check status of task by task_id while task is working
result = requests.get('http://127.0.0.1:8000/task_status/' + task_info['task_id'])
task_status = result.json()

print task_status

if task_status['status'] == 'SUCCESS' and task_status['result']:
print 'Task with id = %s is finished' % task_info['task_id']
print 'Result: %s' % task_status['result']
break
# sleep and check status one more time
sleep(1)

只需调用 python ./example.py你应该看到这样的东西:
{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'SUCCESS', u'result': 8}
Task with id = 76542904-6c22-4536-99d9-87efd66d9fe7 is finished
Result: 8

希望这对您有所帮助。

关于celery - 带有 celery 的猎鹰 python 示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41697221/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com