gpt4 book ai didi

python - 调用函数后,GCP Composer/Airflow 无法识别 DAG

转载 作者:太空宇宙 更新时间:2023-11-03 20:47:43 25 4
gpt4 key购买 nike

我有一个函数可以从 BigQuery 数据集中获取表列表:

def get_table_names(**kwargs):

client = bigquery.Client()

# get source tables

source_tables = []

for table in client.list_tables(
Template('$project.$dataset').substitute(project=SOURCE_PROJECT, dataset=SOURCE_DATASET)):

if table.table_id.startswith(TABLE_PREFIX):
source_tables.append(table.table_id)

logging.info(str(len(source_tables)) + ' tables scheduled to move')

return source_tables

我最初是在 PythonOperator 类型的任务中调用此函数,并且 - 尽管我没有返回值 - 它运行良好,并注销了“计划移动的 524 个表”。

我现在将其称为 dag 设置的一部分,以便我可以实例化每个表的任务(我尚未编写这部分):

table_tasks = get_table_names()

但是一旦我调用它,Composer/Airflow Web 界面就停止识别 DAG - 它仍然列出,如果我单击重新加载图标,我会收到通常的“新鲜如雏菊消息”,但如果我尝试去进入 DAG 我得到:

DAG "GA360_Replication" seems to be missing

最佳答案

DAG 丢失的最可能原因是代码中的错误导致调度程序无法拾取 DAG。您还可以检查是否有 2 个具有相同 DAG 名称的 .py 文件。当您替换上传具有不同名称但相同 DAG 名称的 .py 文件时(即使您删除了以前的 .py 文件),我也看到了这种情况发生。如果不检查环境/日志,很难排除故障,但我认为这些是最有可能的情况。欢迎contact support如果您仍然遇到此问题。

无论如何,我制作的 DAG 在 Composer 1.7.1 Airflow 1.10.2 和 Python3 中都能正常工作。阅读问题和代码,感觉您想要将列表传递到表中以进行下一个任务,因此我添加了一个仅使用 XCOM 打印它们的列表。 :

import datetimeimport osimport airflowfrom airflow import modelsfrom airflow.operators import python_operatorfrom google.cloud import bigqueryimport timeimport logging

default_dag_args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(1)}

with models.DAG( 'test_table_xcom', default_args=default_dag_args, schedule_interval = "@daily") as dag:

TABLE_PREFIX = 'test'
SOURCE_PROJECT = <PROJECT>
SOURCE_DATASET = <DATASET>

def get_table_names(**kwargs):

client = bigquery.Client()
source_tables = []


dataset = '{}.{}'.format(SOURCE_PROJECT,SOURCE_DATASET)

for table in client.list_tables(dataset):
if table.table_id.startswith(TABLE_PREFIX):
source_tables.append(table.table_id)

logging.info('{} tables scheduled to move'.format(len(source_tables)))
return source_tables

def print_tables(**kwargs):
ti = kwargs['ti']
tables_list = ti.xcom_pull(task_ids='list_tables')
for table in tables_list:
print(table)


listTables = python_operator.PythonOperator(task_id='list_tables',python_callable=get_table_names, provide_context=True)
tablePrint = python_operator.PythonOperator(task_id='print_tables',python_callable=print_tables, provide_context=True)

listTables >> tablePrint

最后但并非最不重要的一点是,请注意 Airflow 本身并不是用于执行 ETL 操作,而是用于安排这些操作。不建议使用 XCOM(如文档所示 *),因为它可能会使在 Airflow/Composer 下运行的数据库(在本例中为 Cloud SQL)过载。对于您要传输表名称列表的特殊情况,我认为这不会成为问题,但最好了解此建议。

*if two operators need to share information, like a filename or small amount of data, you should consider combining them into a single operator.

关于python - 调用函数后,GCP Composer/Airflow 无法识别 DAG,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56464019/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com