gpt4 book ai didi

airflow - 哎呀……清除 Airflow 中失败的任务状态时出现 AttributeError

转载 作者:行者123 更新时间:2023-12-01 01:46:45 24 4
gpt4 key购买 nike

我正在尝试清除失败的任务,以便它再次运行。

我通常使用树 View 中的 Web GUI 执行此操作

tree view showing failed task & clear popup

选择“清除”后,我被定向到一个错误页面:

error page

此页面上的回溯与我在尝试使用 CLI 清除此任务时收到的错误相同:

[u@airflow01 ~]# airflow clear -s 2002-07-29T20:25:00 -t 
coverage_check gom_modis_aqua_coverage_check
[2018-01-16 16:21:04,235] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-01-16 16:21:05,192] {models.py:167} INFO - Filling up the DagBag from /root/airflow/dags
Traceback (most recent call last):
File "/usr/bin/airflow", line 28, in <module>
args.func(args)
File "/usr/lib/python3.4/site-packages/airflow/bin/cli.py", line 612, in clear
include_upstream=args.upstream,
File "/usr/lib/python3.4/site-packages/airflow/models.py", line 3173, in sub_dag
dag = copy.deepcopy(self)
File "/usr/lib64/python3.4/copy.py", line 166, in deepcopy
y = copier(memo)
File "/usr/lib/python3.4/site-packages/airflow/models.py", line 3159, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/usr/lib64/python3.4/copy.py", line 155, in deepcopy
y = copier(x, memo)
File "/usr/lib64/python3.4/copy.py", line 246, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib64/python3.4/copy.py", line 166, in deepcopy
y = copier(memo)
File "/usr/lib/python3.4/site-packages/airflow/models.py", line 2202, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/usr/lib64/python3.4/copy.py", line 155, in deepcopy
y = copier(x, memo)
File "/usr/lib64/python3.4/copy.py", line 246, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib64/python3.4/copy.py", line 182, in deepcopy
y = _reconstruct(x, rv, 1, memo)
File "/usr/lib64/python3.4/copy.py", line 309, in _reconstruct
y.__dict__.update(state)
AttributeError: 'NoneType' object has no attribute 'update'

寻找有关可能导致此问题的原因,我应该如何解决此任务以及将来如何避免这种情况的想法。

我能够通过使用“浏览 > 任务实例”搜索删除任务记录来解决这个问题,但我仍然想探索这个问题,因为我已经多次看到这个问题。

尽管我的 DAG 代码越来越复杂,但这里摘录了在 dag 中定义运算符的地方:
    trigger_granule_dag_id = 'trigger_' + process_pass_dag_name
coverage_check = BranchPythonOperator(
task_id='coverage_check',
python_callable=_coverage_check,
provide_context=True,
retries=10,
retry_delay=timedelta(hours=3),
queue=QUEUE.PYCMR,
op_kwargs={
'roi':region,
'success_branch_id': trigger_granule_dag_id
}
)

完整源代码可在 github/USF-IMARS/imars_dags 浏览.以下是最相关部分的链接:
  • /gom/gom_modis_aqua_coverage_check.py 中实例化的运算符使用 modis_aqua_coverage_check 工厂
  • 工厂函数在 /builders/modis_aqua_coverage_check.py 中定义了coverage_check BranchPythonOperator
  • python_callable 是 _coverage_check function in same file
  • 最佳答案

    下面是我创建的一个示例 DAG,用于模拟您面临的错误。

    import logging
    import os
    from datetime import datetime, timedelta

    import boto3
    from airflow import DAG
    from airflow import configuration as conf
    from airflow.operators import ShortCircuitOperator, PythonOperator, DummyOperator


    def athena_data_validation(**kwargs):
    pass


    start_date = datetime.now()

    args = {
    'owner': 'airflow',
    'start_date': start_date,
    'depends_on_past': False,
    'wait_for_downstream': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds=30)
    }

    dag_name = 'data_validation_dag'

    schedule_interval = None

    dag = DAG(
    dag_id=dag_name,
    default_args=args,
    schedule_interval=schedule_interval)

    athena_client = boto3.client('athena', region_name='us-west-2')

    DAG_SCRIPTS_DIR = conf.get('core', 'DAGS_FOLDER') + "/data_validation/"

    start_task = DummyOperator(task_id='Start_Task', dag=dag)

    end_task = DummyOperator(task_id='End_Task', dag=dag)

    data_validation_task = ShortCircuitOperator(
    task_id='Data_Validation',
    provide_context=True,
    python_callable=athena_data_validation,
    op_kwargs={
    'athena_client': athena_client,
    'sql_file': DAG_SCRIPTS_DIR + 'data_validation.sql',
    's3_output_path': 's3://XXX/YYY/'
    },
    dag=dag)
    data_validation_task.set_upstream(start_task)
    data_validation_task.set_downstream(end_task)

    一次成功运行后,我试图清除 Data_Validation任务并得到相同的错误(见下文)。
    enter image description here
    enter image description here

    我删除了 athena_client创建对象并将其放置在 athena_data_validation 中功能,然后它工作。所以当我们做一个 clear在 Airflow UI 中,它尝试执行 deepcopy并从上次运行中获取所有对象。我仍在试图理解为什么它无法获得 object 的副本。类型,但我有一个对我有用的解决方法。

    关于airflow - 哎呀……清除 Airflow 中失败的任务状态时出现 AttributeError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48285790/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com