gpt4 book ai didi

python - Docker,Celery,组合方法失败

转载 作者:行者123 更新时间:2023-12-02 18:33:57 25 4
gpt4 key购买 nike

我正在尝试使用基于 FastAPI 的 API,将 celery、redis 和 rabitMQ 作为后台任务。在执行 docker-compose up 时,redis、rabbit 和 flower 部分工作,我可以访问 flower 仪表板。

但它会卡在 celery 部分。

错误:

 rabbitmq_1       | 2020-09-08 06:32:38.552 [info] <0.716.0> connection <0.716.0> (172.22.0.6:49290 -> 172.22.0.2:5672): user 'user' authenticated and granted access to vhost '/'
celery-flower_1 | [W 200908 06:32:41 control:44] 'stats' inspect method failed
celery-flower_1 | [W 200908 06:32:41 control:44] 'active_queues' inspect method failed
celery-flower_1 | [W 200908 06:32:41 control:44] 'registered' inspect method failed
celery-flower_1 | [W 200908 06:32:41 control:44] 'scheduled' inspect method failed
celery-flower_1 | [W 200908 06:32:41 control:44] 'active' inspect method failed
celery-flower_1 | [W 200908 06:32:41 control:44] 'reserved' inspect method failed
celery-flower_1 | [W 200908 06:32:41 control:44] 'revoked' inspect method failed
celery-flower_1 | [W 200908 06:32:41 control:44] 'conf' inspect method failed

我的 docker-compose 文件:

version: "3.7"

services:
rabbitmq:
image: "bitnami/rabbitmq:3.7"
ports:
- "4000:4000"
- "5672:5672"
volumes:
- "rabbitmq_data:/bitnami"

redis:
image: "bitnami/redis:5.0.4"
environment:
- REDIS_PASSWORD=password123
ports:
- "5000:5000"
volumes:
- "redis_data:/bitnami/redis/data"

celery-flower:
image: gregsi/latest-celery-flower-docker:latest
environment:
- AMQP_USERNAME=user
- AMQP_PASSWORD=bitnami
- AMQP_ADMIN_USERNAME=user
- AMQP_ADMIN_PASSWORD=bitnami
- AMQP_HOST=rabbitmq
- AMQP_PORT=5672
- AMQP_ADMIN_HOST=rabbitmq
- AMQP_ADMIN_PORT=15672
- FLOWER_BASIC_AUTH=user:test
ports:
- "5555:5555"
depends_on:
- rabbitmq
- redis

fastapi:
build: .
ports:
- "8000:8000"
depends_on:
- rabbitmq
- redis
volumes:
- "./:/app"
command: "poetry run uvicorn app/app/main:app --bind 0.0.0.0:8000"

worker:
build: .
depends_on:
- rabbitmq
- redis
volumes:
- "./:/app"
command: "poetry run celery worker -A app.app.worker.celery_worker -l info -Q test-queue -c 1"

volumes:
rabbitmq_data:
driver: local
redis_data:
driver: local

我的 celery 应用:

celery_app = Celery(
"worker",
backend="redis://:password123@redis:6379/0",
broker="amqp://user:bitnami@rabbitmq:5672//"
)

celery_app.conf.task_routes = {
"app.app.worker.celery_worker.compute_stock_indicators": "stocks-queue"
}

celery_app.conf.update(task_track_started=True)

celery worker :

@celery_app.task(acks_late=True)
def compute_stock_indicators(stocks: list, background_task):
stocks_with_indicators = {}
for stock in stocks:
current_task.update_state(state=Actions.STARTED,
meta={f"starting to fetch {stock}'s indicators"})

stock_indicators = fetch_stock_indicators(stock) # Fetch the stock most recent indicators
current_task.update_state(state=Actions.FINISHED,
meta={f"{stock}'s indicators fetched"})

stocks_with_indicators.update({stock: stock_indicators})

current_task.update_state(state=Actions.PROGRESS,
meta={f"predicting {stocks}s..."})

快速 API 函数:

log = logging.getLogger(__name__)
rabbit = RabbitMQHandler(host='localhost', port=5672, level="DEBUG")
log.addHandler(rabbit)


def celery_on_message(body):
"""
Logs the initiation of the function
"""
log.warning(body)


def background_on_message(task):
"""
logs the function when it is added to queue
"""
log.warning(task.get(on_message=celery_on_message, propagate=False))


app = FastAPI(debug=True)


@app.post("/")
async def initiator(stocks: FrozenSet, background_task: BackgroundTasks, ):
"""
:param stocks: stocks to be analyzed
:type stocks: set
:param background_task: initiate the tasks queue
:type background_task: starlette.background.BackgroundTasks
"""
log.warning(msg=f'beginning analysis on: {stocks}')
task_name = "app.app.worker.celery_worker.compute_stock_indicators"

task = celery_app.send_task(task_name, args=[stocks, background_task])
background_task.add_task(background_on_message, task)
return {"message": "Stocks indicators successfully calculated,stocks sent to prediction"}

最佳答案

docker-compose 上,在 worker 部分,command 读取:

command: "poetry run celery worker -A app.app.worker.celery_worker -l info -Q test-queue -c 1"

所以本质上,您是在要求工作人员“观察”名为 test-queue 的队列。
但是在 celery_app 上,在以下部分:

celery_app.conf.task_routes = {
"app.app.worker.celery_worker.compute_stock_indicators": "stocks-queue"
}

您正在定义一个名为 stocks-queue 的队列。

更改 docker-composecelery_app 的队列名称以匹配另一个。

关于python - Docker,Celery,组合方法失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63788367/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com