gpt4 book ai didi

docker - 如何使用 docker-compose 在分布式 Airflow 架构上配置 celery worker?

转载 作者:行者123 更新时间:2023-12-04 13:08:33 24 4
gpt4 key购买 nike

我正在建立一个分布式 Airflow 集群,其中除了 celery 工作人员之外的所有其他内容都在一台主机上运行,​​处理在多台主机上完成。 airflow2.0 设置是使用 Airflow 文档 https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml 中给出的 yaml 文件配置的。 .在我最初的测试中,当我在同一台主机上运行所有东西时,我让架构很好地工作。问题是,如何在远程主机上启动 celery 工作人员?
到目前为止,我尝试创建上述 docker-compose 的修剪版本,我只在工作主机上启动 celery 工作人员,没有其他任何事情。但是我遇到了数据库连接的一些问题。在修剪版本中,我更改了 URL,以便它们指向运行 db 和 redis 的主机。
dags、日志、插件和 postgresql 数据库位于所有主机可见的共享驱动器上。
我应该如何进行配置?任何想法要检查什么?连接等?
Celery worker docker-compose 配置:

---
version: '3'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment:
&airflow-common-env
AIRFLOW_UID: 50000
AIRFLOW_GID: 50000
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN:
postgresql+psycopg2://airflow:airflow@airflowhost.example.com:8080/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow@airflowhost.example.com:8080/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@airflow@airflowhost.example.com:6380/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
REDIS_PORT: 6380
volumes:
- /airflow/dev/dags:/opt/airflow/dags
- /airflow/dev/logs:/opt/airflow/logs
- /airflow/dev/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
services:
airflow-remote-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always
编辑 1:
我在处理日志文件时仍然遇到一些困难。看来共享日志目录并不能解决丢失日志文件的问题。我像建议的那样在 main 上添加了 extra_host 定义,并在工作机器上打开了端口 8793。
工作任务失败并显示日志:
*** Log file does not exist: 
/opt/airflow/logs/tutorial/print_date/2021-07-
01T13:57:11.087882+00:00/1.log
*** Fetching from: http://:8793/log/tutorial/print_date/2021-07-01T13:57:11.087882+00:00/1.log
*** Failed to fetch log file from worker. Unsupported URL protocol ''

最佳答案

这些设置远非“终极设置”,而是在核心节点和工作程序中使用来自 Airflow 的 docker-compose 对我有用的一些设置:
主节点:

  • 必须可以从 Webserver 所在的主节点访问工作节点。运行。我找到了 this diagramCeleryExecutor架构非常有助于解决问题。
    在尝试读取日志时,如果在本地找不到它们,它将尝试从远程 worker 那里检索它们。因此,您的主节点可能不知道您的工作人员的主机名,因此您可以更改主机名的解析方式( hostname_callable 设置,默认为 socket.getfqdn ),或者您只需将名称解析功能添加到 Webserver .这可以通过添加 extra_hosts 来完成。 x-airflow-common 中的配置键定义:

  • ---
    version: "3"
    x-airflow-common: &airflow-common
    image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
    environment: &airflow-common-env
    ...# env vars
    extra_hosts:
    - "worker-01-hostname:worker-01-ip-address" # "worker-01-hostname:192.168.0.11"
    - "worker-02-hostname:worker-02-ip-address"
    *请注意,在您拥有共享驱动器的特定情况下,我认为日志将在本地找到。
  • 定义并行性、DAG 并发性和调度程序解析过程。可以通过使用环境变量来完成:

  • x-airflow-common: &airflow-common
    image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
    environment: &airflow-common-env
    AIRFLOW__CORE__PARALLELISM: 64
    AIRFLOW__CORE__DAG_CONCURRENCY: 32
    AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4
    当然,要设置的值取决于您的具体情况和可用资源。 This article对该主题有一个很好的概述。 DAG 设置也可以在 DAG 处被覆盖定义。
    工作节点:
  • 定义 worker CELERY__WORKER_CONCURRENCY ,默认值可能是机器上可用的 CPU 数量( docs )。
  • 定义如何访问主节点中运行的服务。设置 IP 或主机名并注意主节点中匹配的暴露端口:

  • x-airflow-common: &airflow-common
    image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
    environment: &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CELERY__WORKER_CONCURRENCY: 8
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@main_node_ip_or_hostname:5432/airflow # 5432 is default postgres port
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@main_node_ip_or_hostname:5432/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@main_node_ip_or_hostname:6379/0
  • 分享同一个 Fernet KeySecret Key从“.env”文件中读取它们:

  •   environment: &airflow-common-env
    AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
    AIRFLOW__WEBSERVER__SECRET_KEY: ${SECRET_KEY}

    env_file:
    - .env
    .env 文件: FERNET_KEY=jvYUaxxxxxxxxxxxxx=
  • 它是 关键 集群中的每个节点(主节点和工作节点)都应用了相同的设置。
  • 为工作服务定义一个主机名,以避免自动生成匹配容器 ID。
  • 公开端口 8793,这是用于从工作程序 ( docs ) 获取日志的默认端口:

  • services:
    airflow-worker:
    <<: *airflow-common
    hostname: ${HOSTNAME}
    ports:
    - 8793:8793
    command: celery worker
    restart: always
  • 确保每个工作节点主机都以相同的时间配置运行,几分钟的差异可能会导致严重的执行错误,而这些错误可能不太容易发现。考虑在主机操作系统上启用 NTP 服务。

  • 如果您有繁重的工作负载和高并发性,您可能需要调整 Postgres 设置,例如 max_connectionsshared_buffers .这同样适用于主机操作系统网络设置,例如 ip_local_port_rangesomaxconn .
    在我在初始集群设置过程中遇到的任何问题中, Flower并且工作程序执行日志始终提供有用的详细信息和错误消息,包括任务级日志和 Docker-Compose 服务日志,即: docker-compose logs --tail=10000 airflow-worker > worker_logs.log .
    希望对你有用!

    关于docker - 如何使用 docker-compose 在分布式 Airflow 架构上配置 celery worker?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68194327/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com