gpt4 book ai didi

python-2.7 - 不通过使用 RabbitMQ 运行 celery 的 Airflow 执行的作业

转载 作者:行者123 更新时间:2023-12-03 09:59:47 24 4
gpt4 key购买 nike

下面是我使用的配置

[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /root/airflow

# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
dags_folder = /root/airflow/dags

# The folder where airflow should store its log files. This location
base_log_folder = /root/airflow/logs

# An S3 location can be provided for log backups
# For S3, use the full URL to the base folder (starting with "s3://...")
s3_log_folder = None

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
#executor = SequentialExecutor
#executor = LocalExecutor
executor = CeleryExecutor

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
#sql_alchemy_conn = sqlite:////home/centos/airflow/airflow.db
sql_alchemy_conn = mysql://username:password@XXX.XXX.XXX.XXX:3306/airflow_prod

[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above


# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
celeryd_concurrency = 16

# When you start an airflow worker, airflow starts a tiny web server
# subprocess to serve the workers local log files to the airflow main
# web server, who then builds pages and sends them to users. This defines
# the port on which the logs are served. It needs to be unused, and open
# visible from the main web server to connect into the workers.
worker_log_server_port = 8793

# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
broker_url = pyamqp://guest:guest@XXX.XXX.XXX.XXX:5672/


# Another key Celery setting
celery_result_backend = db+mysql://username:password@XXX.XXX.XXX.XXX:3306/airflow_prod

# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
# it `airflow flower`. This defines the port that Celery Flower runs on
flower_port = 5556

# Default queue that tasks get assigned to and that worker listen on.
default_queue = = default

但是作业没有运行......调度程序显示它正在检查状态如下
[2017-05-11 05:09:13,070] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-13 00:00:00: scheduled__2015-06-13T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,072] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-14 00:00:00: scheduled__2015-06-14T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,074] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-15 00:00:00: scheduled__2015-06-15T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,076] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-16 00:00:00: scheduled__2015-06-16T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,078] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2017-05-10 04:46:29: manual__2017-05-10T04:46:28.756946, externally triggered: True>
[2017-05-11 05:09:13,080] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2017-05-10 05:08:20: manual__2017-05-10T05:08:20.252573, externally triggered: True>

Airflow UI 已启动并正在运行。 celery 花没有显示任何 worker 。 我的工作没有运行。

以下是我开始遵循的顺序。

Airflow 调度器

Airflow 网络服务器 -p 8080

Airflow worker

有什么我想念的吗?

最佳答案

在不知道您正在运行的 Airflow 版本以及您如何配置您的 rabbitmq-server 的情况下,很难肯定地回答您的问题。但是,我可以提供一些内容供您查看。

Here is the Celery documentation for specifying a broker URL .您的 airflow.cfg 中的经纪人 URL未指定虚拟主机,因此根据文档,将使用默认虚拟主机。我做了一些挖掘,但找不到 pyampq 的默认虚拟主机是什么,但这值得研究。

或者,您可以使用 rabbitmqctl 显式配置虚拟主机。 . Here is a post where someone lays out how to do this with Airflow .我已复制粘贴以下相关信息:

# Configure RabbitMQ: create user and grant privileges
rabbitmqctl add_user rabbitmq_user_name rabbitmq_password
rabbitmqctl add_vhost rabbitmq_virtual_host_name
rabbitmqctl set_user_tags rabbitmq_user_name rabbitmq_tag_name
rabbitmqctl set_permissions -p rabbitmq_virtual_host_name rabbitmq_user_name ".*" ".*" ".*"

最后,您可能会遇到与您使用的 Celery 版本有关的问题。在发布时,Celery 4.X.X 不能很好地与 Airflow 配合使用。尝试卸载 Celery 并重新安装一个有效的版本。
pip uninstall celery
pip install celery==3.1.7

关于python-2.7 - 不通过使用 RabbitMQ 运行 celery 的 Airflow 执行的作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43907040/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com