gpt4 book ai didi

python - 如何动态迭代上游任务的输出以在 Airflow 中创建并行任务?

转载 作者:太空狗 更新时间:2023-10-30 00:59:05 31 4
gpt4 key购买 nike

考虑以下 DAG 示例,其中第一个任务 get_id_creds 从数据库中提取凭据列表。此操作告诉我数据库中的哪些用户我能够对其运行进一步的数据预处理,并将这些 ID 写入文件 /tmp/ids.txt。然后,我将这些 ID 扫描到我的 DAG 中,并使用它们生成可以并行运行的 upload_transaction 任务列表。

我的问题是:是否有更符合惯用语的正确动态方法来使用 Airflow 来执行此操作?我这里的东西感觉笨拙和脆弱。如何将有效 ID 列表从一个流程直接传递到定义后续下游流程的流程?

from datetime import datetime, timedelta
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)

default_args = {
'start_date': datetime.now(),
'schedule_interval': None
}

DAG = DAG(
dag_id='dash_preproc',
default_args=default_args
)

get_id_creds = PythonOperator(
task_id='get_id_creds',
python_callable=dash_workers.get_id_creds,
provide_context=True,
dag=DAG)

with open('/tmp/ids.txt', 'r') as infile:
ids = infile.read().splitlines()

for uid in uids:
upload_transactions = PythonOperator(
task_id=uid,
python_callable=dash_workers.upload_transactions,
op_args=[uid],
dag=DAG)
upload_transactions.set_downstream(get_id_creds)

最佳答案

根据@Juan Riza 的建议,我检查了这个链接:Proper way to create dynamic workflows in Airflow .这几乎就是答案,尽管我能够充分简化解决方案,以至于我想我会在这里提供我自己的实现修改版本:

from datetime import datetime
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)

ENV = os.environ

default_args = {
# 'start_date': datetime.now(),
'start_date': datetime(2017, 7, 18)
}

DAG = DAG(
dag_id='dash_preproc',
default_args=default_args
)

clear_tables = PythonOperator(
task_id='clear_tables',
python_callable=dash_workers.clear_db,
dag=DAG)

def id_worker(uid):
return PythonOperator(
task_id=uid,
python_callable=dash_workers.main_preprocess,
op_args=[uid],
dag=DAG)

for uid in capone_dash_workers.get_id_creds():
clear_tables >> id_worker(uid)

clear_tables清理将作为该过程的结果重建的数据库。 id_worker是一个函数,它根据从 get_if_creds 返回的 ID 值数组动态生成新的预处理任务。 .任务 ID 就是相应的用户 ID,尽管它很可能是一个索引,i ,就像上面提到的例子一样。

注意 位移运算符 ( << ) 向后看,因为 clear_tables任务应该是第一位的,但在这种情况下似乎是可行的。

关于python - 如何动态迭代上游任务的输出以在 Airflow 中创建并行任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45314174/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com