gpt4 book ai didi

kubernetes - 在 KubernetesExecutor 上运行 Airflow 时出现 Airflow 错误 "dag_id could not be found"

转载 作者:行者123 更新时间:2023-12-05 06:53:40 28 4
gpt4 key购买 nike

我正在使用这个 helm chart 来部署 Airflow https://github.com/apache/airflow/tree/master/chart

Apache Airflow 版本:2.0.0

Kubernetes 版本:v1.19.4

发生了什么:尝试使用 kubernetes 执行任务时出现此错误

[2021-01-14 19:39:17,628] {dagbag.py:440} INFO - Filling up the DagBag from /opt/airflow/dags/repo/bash.py
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/__main__.py", line 40, in main
args.func(args)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 89, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 216, in task_run
dag = get_dag(args.subdir, args.dag_id)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 189, in get_dag
'parse.'.format(dag_id)
airflow.exceptions.AirflowException: dag_id could not be found: bash. Either the dag did not exist or it failed to parse.

如何复制它:使用此 values.yaml 部署 Airflow Helm chart :

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
---
# Default values for airflow.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.

# User and group of airflow user
uid: 50000
gid: 50000

# Airflow home directory
# Used for mount paths
airflowHome: "/opt/airflow"

# Default airflow repository -- overrides all the specific images below
defaultAirflowRepository: apache/airflow

# Default airflow tag to deploy
defaultAirflowTag: 2.0.0


# Select certain nodes for airflow pods.
nodeSelector: { }
affinity: { }
tolerations: [ ]

# Add common labels to all objects and pods defined in this chart.
labels: { }

# Ingress configuration
ingress:
# Enable ingress resource
enabled: false

# Configs for the Ingress of the web Service
web:
# Annotations for the web Ingress
annotations: { }

# The path for the web Ingress
path: ""

# The hostname for the web Ingress
host: ""

# configs for web Ingress TLS
tls:
# Enable TLS termination for the web Ingress
enabled: false
# the name of a pre-created Secret containing a TLS private key and certificate
secretName: ""

# HTTP paths to add to the web Ingress before the default path
precedingPaths: [ ]

# Http paths to add to the web Ingress after the default path
succeedingPaths: [ ]

# Configs for the Ingress of the flower Service
flower:
# Annotations for the flower Ingress
annotations: { }

# The path for the flower Ingress
path: ""

# The hostname for the flower Ingress
host: ""

# configs for web Ingress TLS
tls:
# Enable TLS termination for the flower Ingress
enabled: false
# the name of a pre-created Secret containing a TLS private key and certificate
secretName: ""

# HTTP paths to add to the flower Ingress before the default path
precedingPaths: [ ]

# Http paths to add to the flower Ingress after the default path
succeedingPaths: [ ]

# Network policy configuration
networkPolicies:
# Enabled network policies
enabled: false


# Extra annotations to apply to all
# Airflow pods
airflowPodAnnotations: { }

# Enable RBAC (default on most clusters these days)
rbacEnabled: true

# Airflow executor
# Options: SequentialExecutor, LocalExecutor, CeleryExecutor, KubernetesExecutor
executor: "KubernetesExecutor"

# If this is true and using LocalExecutor/SequentialExecutor/KubernetesExecutor, the scheduler's
# service account will have access to communicate with the api-server and launch pods.
# If this is true and using the CeleryExecutor, the workers will be able to launch pods.
allowPodLaunching: true

# Images
images:
airflow:
repository: ~
tag: ~
pullPolicy: IfNotPresent
pod_template:
repository: ~
tag: ~
pullPolicy: IfNotPresent
flower:
repository: ~
tag: ~
pullPolicy: IfNotPresent
statsd:
repository: apache/airflow
tag: airflow-statsd-exporter-2020.09.05-v0.17.0
pullPolicy: IfNotPresent
redis:
repository: redis
tag: 6-buster
pullPolicy: IfNotPresent
pgbouncer:
repository: apache/airflow
tag: airflow-pgbouncer-2020.09.05-1.14.0
pullPolicy: IfNotPresent
pgbouncerExporter:
repository: apache/airflow
tag: airflow-pgbouncer-exporter-2020.09.25-0.5.0
pullPolicy: IfNotPresent
gitSync:
repository: k8s.gcr.io/git-sync
tag: v3.1.6
pullPolicy: IfNotPresent

# Environment variables for all airflow containers
env:
- name: "AIRFLOW__KUBERNETES__GIT_SYNC_RUN_AS_USER"
value: "65533"

# Secrets for all airflow containers
secret: [ ]
# - envName: ""
# secretName: ""
# secretKey: ""

# Extra secrets that will be managed by the chart
# (You can use them with extraEnv or extraEnvFrom or some of the extraVolumes values).
# The format is "key/value" where
# * key (can be templated) is the the name the secret that will be created
# * value: an object with the standard 'data' or 'stringData' key (or both).
# The value associated with those keys must be a string (can be templated)
extraSecrets: { }
# eg:
# extraSecrets:
# {{ .Release.Name }}-airflow-connections:
# data: |
# AIRFLOW_CONN_GCP: 'base64_encoded_gcp_conn_string'
# AIRFLOW_CONN_AWS: 'base64_encoded_aws_conn_string'
# stringData: |
# AIRFLOW_CONN_OTHER: 'other_conn'
# {{ .Release.Name }}-other-secret-name-suffix: |
# data: |
# ...

# Extra ConfigMaps that will be managed by the chart
# (You can use them with extraEnv or extraEnvFrom or some of the extraVolumes values).
# The format is "key/value" where
# * key (can be templated) is the the name the configmap that will be created
# * value: an object with the standard 'data' key.
# The value associated with this keys must be a string (can be templated)
extraConfigMaps: { }
# eg:
# extraConfigMaps:
# {{ .Release.Name }}-airflow-variables:
# data: |
# AIRFLOW_VAR_HELLO_MESSAGE: "Hi!"
# AIRFLOW_VAR_KUBERNETES_NAMESPACE: "{{ .Release.Namespace }}"

# Extra env 'items' that will be added to the definition of airflow containers
# a string is expected (can be templated).
extraEnv: ~
# eg:
# extraEnv: |
# - name: PLATFORM
# value: FR

# Extra envFrom 'items' that will be added to the definition of airflow containers
# A string is expected (can be templated).
extraEnvFrom: ~
# eg:
# extraEnvFrom: |
# - secretRef:
# name: '{{ .Release.Name }}-airflow-connections'
# - configMapRef:
# name: '{{ .Release.Name }}-airflow-variables'

# Airflow database config
data:
# If secret names are provided, use those secrets
metadataSecretName: ~
resultBackendSecretName: ~

# Otherwise pass connection values in
metadataConnection:
user: postgres
pass: postgres
host: ~
port: 5432
db: postgres
sslmode: disable
resultBackendConnection:
user: postgres
pass: postgres
host: ~
port: 5432
db: postgres
sslmode: disable

# Fernet key settings
fernetKey: ~
fernetKeySecretName: ~


# In order to use kerberos you need to create secret containing the keytab file
# The secret name should follow naming convention of the application where resources are
# name {{ .Release-name }}-<POSTFIX>. In case of the keytab file, the postfix is "kerberos-keytab"
# So if your release is named "my-release" the name of the secret should be "my-release-kerberos-keytab"
#
# The Keytab content should be available in the "kerberos.keytab" key of the secret.
#
# apiVersion: v1
# kind: Secret
# data:
# kerberos.keytab: <base64_encoded keytab file content>
# type: Opaque
#
#
# If you have such keytab file you can do it with similar
#
# kubectl create secret generic {{ .Release.name }}-kerberos-keytab --from-file=kerberos.keytab
#
kerberos:
enabled: false
ccacheMountPath: '/var/kerberos-ccache'
ccacheFileName: 'cache'
configPath: '/etc/krb5.conf'
keytabPath: '/etc/airflow.keytab'
principal: 'airflow@FOO.COM'
reinitFrequency: 3600
config: |
# This is an example config showing how you can use templating and how "example" config
# might look like. It works with the test kerberos server that we are using during integration
# testing at Apache Airflow (see `scripts/ci/docker-compose/integration-kerberos.yml` but in
# order to make it production-ready you must replace it with your own configuration that
# Matches your kerberos deployment. Administrators of your Kerberos instance should
# provide the right configuration.

[logging]
default = "FILE:{{ template "airflow_logs_no_quote" . }}/kerberos_libs.log"
kdc = "FILE:{{ template "airflow_logs_no_quote" . }}/kerberos_kdc.log"
admin_server = "FILE:{{ template "airflow_logs_no_quote" . }}/kadmind.log"

[libdefaults]
default_realm = FOO.COM
ticket_lifetime = 10h
renew_lifetime = 7d
forwardable = true

[realms]
FOO.COM = {
kdc = kdc-server.foo.com
admin_server = admin_server.foo.com
}

# Airflow Worker Config
workers:
# Number of airflow celery workers in StatefulSet
replicas: 1

# Allow KEDA autoscaling.
# Persistence.enabled must be set to false to use KEDA.
keda:
enabled: false
namespaceLabels: { }

# How often KEDA polls the airflow DB to report new scale requests to the HPA
pollingInterval: 5

# How many seconds KEDA will wait before scaling to zero.
# Note that HPA has a separate cooldown period for scale-downs
cooldownPeriod: 30

# Maximum number of workers created by keda
maxReplicaCount: 10

persistence:
# Enable persistent volumes
enabled: true
# Volume size for worker StatefulSet
size: 100Gi
# If using a custom storageClass, pass name ref to all statefulSets here
storageClassName:
# Execute init container to chown log directory.
# This is currently only needed in KinD, due to usage
# of local-path provisioner.
fixPermissions: false

kerberosSidecar:
# Enable kerberos sidecar
enabled: false

resources: { }
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi

# Grace period for tasks to finish after SIGTERM is sent from kubernetes
terminationGracePeriodSeconds: 600

# This setting tells kubernetes that its ok to evict
# when it wants to scale a node down.
safeToEvict: true
# Annotations to add to worker kubernetes service account.
serviceAccountAnnotations: { }
# Mount additional volumes into worker.
extraVolumes: [ ]
extraVolumeMounts: [ ]

# Airflow scheduler settings
scheduler:
# Airflow 2.0 allows users to run multiple schedulers,
# However this feature is only recommended for MySQL 8+ and Postgres
replicas: 1
# Scheduler pod disruption budget
podDisruptionBudget:
enabled: false

# PDB configuration
config:
maxUnavailable: 1

resources: { }
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi

# This setting can overwrite
# podMutation settings.
airflowLocalSettings: ~

# This setting tells kubernetes that its ok to evict
# when it wants to scale a node down.
safeToEvict: true

# Annotations to add to scheduler kubernetes service account.
serviceAccountAnnotations: { }

# Mount additional volumes into scheduler.
extraVolumes: [ ]
extraVolumeMounts: [ ]

# Airflow webserver settings
webserver:
allowPodLogReading: true
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 30
failureThreshold: 20
periodSeconds: 5

readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 30
failureThreshold: 20
periodSeconds: 5

# Number of webservers
replicas: 1

# Additional network policies as needed
extraNetworkPolicies: [ ]

resources: { }
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi

# Create initial user.
defaultUser:
enabled: true
role: Admin
username: admin
email: admin@example.com
firstName: admin
lastName: user
password: admin

# Mount additional volumes into webserver.
extraVolumes: [ ]
# - name: airflow-ui
# emptyDir: { }
extraVolumeMounts: [ ]
# - name: airflow-ui
# mountPath: /opt/airflow

# This will be mounted into the Airflow Webserver as a custom
# webserver_config.py. You can bake a webserver_config.py in to your image
# instead
webserverConfig: ~
# webserverConfig: |
# from airflow import configuration as conf

# # The SQLAlchemy connection string.
# SQLALCHEMY_DATABASE_URI = conf.get('core', 'SQL_ALCHEMY_CONN')

# # Flask-WTF flag for CSRF
# CSRF_ENABLED = True

service:
type: NodePort
## service annotations
annotations: { }

# Annotations to add to webserver kubernetes service account.
serviceAccountAnnotations: { }

# Flower settings
flower:
# Additional network policies as needed
extraNetworkPolicies: [ ]
resources: { }
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi

# A secret containing the connection
secretName: ~

# Else, if username and password are set, create secret from username and password
username: ~
password: ~

service:
type: ClusterIP

# Statsd settings
statsd:
enabled: true
# Additional network policies as needed
extraNetworkPolicies: [ ]
resources: { }
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi

service:
extraAnnotations: { }

# Pgbouncer settings
pgbouncer:
# Enable pgbouncer
enabled: false
# Additional network policies as needed
extraNetworkPolicies: [ ]

# Pool sizes
metadataPoolSize: 10
resultBackendPoolSize: 5

# Maximum clients that can connect to pgbouncer (higher = more file descriptors)
maxClientConn: 100

# Pgbouner pod disruption budget
podDisruptionBudget:
enabled: false

# PDB configuration
config:
maxUnavailable: 1

# Limit the resources to pgbouncerExported.
# When you specify the resource request the scheduler uses this information to decide which node to place
# the Pod on. When you specify a resource limit for a Container, the kubelet enforces those limits so
# that the running container is not allowed to use more of that resource than the limit you set.
# See: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
# Example:
#
# resource:
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
resources: { }

service:
extraAnnotations: { }

# https://www.pgbouncer.org/config.html
verbose: 0
logDisconnections: 0
logConnections: 0

sslmode: "prefer"
ciphers: "normal"

ssl:
ca: ~
cert: ~
key: ~

redis:
terminationGracePeriodSeconds: 600

persistence:
# Enable persistent volumes
enabled: true
# Volume size for worker StatefulSet
size: 1Gi
# If using a custom storageClass, pass name ref to all statefulSets here
storageClassName:

resources: { }
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi

# If set use as redis secret
passwordSecretName: ~
brokerURLSecretName: ~

# Else, if password is set, create secret with it,
# else generate a new one on install
password: ~

# This setting tells kubernetes that its ok to evict
# when it wants to scale a node down.
safeToEvict: true

# Auth secret for a private registry
# This is used if pulling airflow images from a private registry
registry:
secretName: ~

# Example:
# connection:
# user: ~
# pass: ~
# host: ~
# email: ~
connection: { }

# Elasticsearch logging configuration
elasticsearch:
# Enable elasticsearch task logging
enabled: true
# A secret containing the connection
# secretName: ~
# Or an object representing the connection
# Example:
connection:
# user:
# pass:
host: elasticsearch-master-headless.elk.svc.cluster.local
port: 9200
# connection: {}


# All ports used by chart
ports:
flowerUI: 5555
airflowUI: 8080
workerLogs: 8793
redisDB: 6379
statsdIngest: 9125
statsdScrape: 9102
pgbouncer: 6543
pgbouncerScrape: 9127

# Define any ResourceQuotas for namespace
quotas: { }

# Define default/max/min values for pods and containers in namespace
limits: [ ]

# This runs as a CronJob to cleanup old pods.
cleanup:
enabled: false
# Run every 15 minutes
schedule: "*/15 * * * *"

# Configuration for postgresql subchart
# Not recommended for production
postgresql:
enabled: true
postgresqlPassword: postgres
postgresqlUsername: postgres

# Config settings to go into the mounted airflow.cfg
#
# Please note that these values are passed through the `tpl` function, so are
# all subject to being rendered as go templates. If you need to include a
# literal `{{` in a value, it must be expressed like this:
#
# a: '{{ "{{ not a template }}" }}'
#
# yamllint disable rule:line-length
config:
core:
dags_folder: '{{ include "airflow_dags" . }}'
load_examples: 'False'
executor: '{{ .Values.executor }}'
# For Airflow 1.10, backward compatibility
colored_console_log: 'True'
remote_logging: '{{- ternary "True" "False" .Values.elasticsearch.enabled }}'
# Authentication backend used for the experimental API
api:
auth_backend: airflow.api.auth.backend.deny_all
logging:
remote_logging: '{{- ternary "True" "False" .Values.elasticsearch.enabled }}'
colored_console_log: 'True'
logging_level: INFO
metrics:
statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
statsd_port: 9125
statsd_prefix: airflow
statsd_host: '{{ printf "%s-statsd" .Release.Name }}'
webserver:
enable_proxy_fix: 'True'
expose_config: 'True'
rbac: 'True'
celery:
default_queue: celery
scheduler:
scheduler_heartbeat_sec: 5
# For Airflow 1.10, backward compatibility
statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
statsd_port: 9125
statsd_prefix: airflow
statsd_host: '{{ printf "%s-statsd" .Release.Name }}'
# Restart Scheduler every 41460 seconds (11 hours 31 minutes)
# The odd time is chosen so it is not always restarting on the same "hour" boundary
run_duration: 41460
elasticsearch:
json_format: 'True'
log_id_template: "{dag_id}_{task_id}_{execution_date}_{try_number}"
elasticsearch_configs:
max_retries: 3
timeout: 30
retry_timeout: 'True'
kerberos:
keytab: '{{ .Values.kerberos.keytabPath }}'
reinit_frequency: '{{ .Values.kerberos.reinitFrequency }}'
principal: '{{ .Values.kerberos.principal }}'
ccache: '{{ .Values.kerberos.ccacheMountPath }}/{{ .Values.kerberos.ccacheFileName }}'
kubernetes:
namespace: '{{ .Release.Namespace }}'
airflow_configmap: '{{ include "airflow_config" . }}'
airflow_local_settings_configmap: '{{ include "airflow_config" . }}'
pod_template_file: '{{ include "airflow_pod_template_file" . }}/pod_template_file.yaml'
worker_container_repository: '{{ .Values.images.airflow.repository | default .Values.defaultAirflowRepository }}'
worker_container_tag: '{{ .Values.images.airflow.tag | default .Values.defaultAirflowTag }}'
delete_worker_pods: 'False'
multi_namespace_mode: '{{ if .Values.multiNamespaceMode }}True{{ else }}False{{ end }}'
# yamllint enable rule:line-length

multiNamespaceMode: false

podTemplate:

# Git sync
dags:
persistence:
# Enable persistent volume for storing dags
enabled: false
# Volume size for dags
size: 1Gi
# If using a custom storageClass, pass name here
storageClassName: gp2
# access mode of the persistent volume
accessMode: ReadWriteMany
## the name of an existing PVC to use
existingClaim: "airflow-dags"
gitSync:
enabled: true
repo: git@github.com:Tikna-inc/airflow.git
branch: main
rev: HEAD
root: "/git"
dest: "repo"
depth: 1
maxFailures: 0
subPath: ""
sshKeySecret: airflow-ssh-secret
wait: 60
containerName: git-sync
uid: 65533

这是带有任务的 dag



from datetime import timedelta

import requests
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

logging.getLogger().setLevel(level=logging.INFO)

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}


def get_active_customers():
requests.get("localhost:8080")


dag = DAG(
'bash',
default_args=default_args,
description='A simple test DAG',
schedule_interval='*/2 * * * *',
start_date=days_ago(1),
tags=['Test'],
is_paused_upon_creation=False,
catchup=False
)

t1 = BashOperator(
task_id='print_date',
bash_command='mkdir ./itsMe',
dag=dag
)

t1

这是airflow.cfg文件

[api]
auth_backend = airflow.api.auth.backend.deny_all

[celery]
default_queue = celery

[core]
colored_console_log = True
dags_folder = /opt/airflow/dags/repo/
executor = KubernetesExecutor
load_examples = False
remote_logging = False

[elasticsearch]
json_format = True
log_id_template = {dag_id}_{task_id}_{execution_date}_{try_number}

[elasticsearch_configs]
max_retries = 3
retry_timeout = True
timeout = 30

[kerberos]
ccache = /var/kerberos-ccache/cache
keytab = /etc/airflow.keytab
principal = airflow@FOO.COM
reinit_frequency = 3600

[kubernetes]
airflow_configmap = airflow-airflow-config
airflow_local_settings_configmap = airflow-airflow-config
dags_in_image = False
delete_worker_pods = False
multi_namespace_mode = False
namespace = airflow
pod_template_file = /opt/airflow/pod_templates/pod_template_file.yaml
worker_container_repository = apache/airflow
worker_container_tag = 2.0.0

[logging]
colored_console_log = True
logging_level = INFO
remote_logging = False

[metrics]
statsd_host = airflow-statsd
statsd_on = True
statsd_port = 9125
statsd_prefix = airflow

[scheduler]
run_duration = 41460
scheduler_heartbeat_sec = 5
statsd_host = airflow-statsd
statsd_on = True
statsd_port = 9125
statsd_prefix = airflow

[webserver]
enable_proxy_fix = True
expose_config = True

这是新任务的 pod yaml 文件

apiVersion: v1
kind: Pod
metadata:
annotations:
dag_id: bash2
execution_date: "2021-01-14T20:16:00+00:00"
kubernetes.io/psp: eks.privileged
task_id: create_dir
try_number: "2"
labels:
airflow-worker: "38"
airflow_version: 2.0.0
dag_id: bash2
execution_date: 2021-01-14T20_16_00_plus_00_00
kubernetes_executor: "True"
task_id: create_dir
try_number: "2"
name: sss3
namespace: airflow
spec:
containers:
- args:
- airflow
- tasks
- run
- bash2
- create_dir
- "2021-01-14T20:16:00+00:00"
- --local
- --pool
- default_pool
- --subdir
- /opt/airflow/dags/repo/bash.py
env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
key: fernet-key
name: airflow-fernet-key
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
key: connection
name: airflow-airflow-metadata
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
key: connection
name: airflow-airflow-metadata
- name: AIRFLOW_IS_K8S_EXECUTOR_POD
value: "True"
image: apache/airflow:2.0.0
imagePullPolicy: IfNotPresent
name: base
resources: { }
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /opt/airflow/logs
name: airflow-logs
- mountPath: /opt/airflow/airflow.cfg
name: config
readOnly: true
subPath: airflow.cfg
- mountPath: /etc/git-secret/ssh
name: git-sync-ssh-key
subPath: ssh
- mountPath: /opt/airflow/dags
name: dags
readOnly: true
- mountPath: /var/run/secrets/kubernetes.io/serviceaccount
name: airflow-worker-token-7sdtr
readOnly: true
dnsPolicy: ClusterFirst
enableServiceLinks: true
initContainers:
- env:
- name: GIT_SSH_KEY_FILE
value: /etc/git-secret/ssh
- name: GIT_SYNC_SSH
value: "true"
- name: GIT_KNOWN_HOSTS
value: "false"
- name: GIT_SYNC_REV
value: HEAD
- name: GIT_SYNC_BRANCH
value: main
- name: GIT_SYNC_REPO
value: git@github.com:Tikna-inc/airflow.git
- name: GIT_SYNC_DEPTH
value: "1"
- name: GIT_SYNC_ROOT
value: /git
- name: GIT_SYNC_DEST
value: repo
- name: GIT_SYNC_ADD_USER
value: "true"
- name: GIT_SYNC_WAIT
value: "60"
- name: GIT_SYNC_MAX_SYNC_FAILURES
value: "0"
- name: GIT_SYNC_ONE_TIME
value: "true"
image: k8s.gcr.io/git-sync:v3.1.6
imagePullPolicy: IfNotPresent
name: git-sync
resources: { }
securityContext:
runAsUser: 65533
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /git
name: dags
- mountPath: /etc/git-secret/ssh
name: git-sync-ssh-key
readOnly: true
subPath: gitSshKey
- mountPath: /var/run/secrets/kubernetes.io/serviceaccount
name: airflow-worker-token-7sdtr
readOnly: true
nodeName: ip-172-31-41-37.eu-south-1.compute.internal
priority: 0
restartPolicy: Never
schedulerName: default-scheduler
securityContext:
runAsUser: 50000
serviceAccount: airflow-worker
serviceAccountName: airflow-worker
terminationGracePeriodSeconds: 30
tolerations:
- effect: NoExecute
key: node.kubernetes.io/not-ready
operator: Exists
tolerationSeconds: 300
- effect: NoExecute
key: node.kubernetes.io/unreachable
operator: Exists
tolerationSeconds: 300
volumes:
- emptyDir: { }
name: dags
- name: git-sync-ssh-key
secret:
defaultMode: 288
secretName: airflow-ssh-secret
- emptyDir: { }
name: airflow-logs
- configMap:
defaultMode: 420
name: airflow-airflow-config
name: config
- name: airflow-worker-token-7sdtr
secret:
defaultMode: 420
secretName: airflow-worker-token-7sdtr

--------------------重要-------------------- --------

调试

出于调试目的,我更改了 pod args 而不是运行任务,我运行它

spec:
containers:
- args:
- airflow
- webserver

并试图寻找 Dags ,但没有找到。 gitSync 似乎无法处理由 kubernetesExecutor 触发的 pod。

有什么帮助吗???

最佳答案

我想我找到了这个问题的根本原因。在 Airflow Helm chart 中,我看到了这段代码:

- name: git-sync-ssh-key
secret:
secretName: {{ .Values.dags.gitSync.sshKeySecret }}
defaultMode: 288

默认模式是导致此问题的一个原因。如果我将 defaultMode 更改为 420,则 dags 会与 github 存储库同步。

问题链接: https://github.com/kubernetes/git-sync/issues/249

关于kubernetes - 在 KubernetesExecutor 上运行 Airflow 时出现 Airflow 错误 "dag_id could not be found",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65727196/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com