gpt4 book ai didi

python - 如何考虑同一 DAG 中先前任务的结果来创建动态数量的任务?

转载 作者:行者123 更新时间:2023-12-03 08:13:24 24 4
gpt4 key购买 nike

我想创建动态数量的任务,同时考虑到在上一个任务中读取 CSV 时获得的 block 数量。除此之外,我想同时运行所有动态任务,并在完成后运行最终任务。所有这些都在同一个 DAG 内。

为了使“图像”更清晰,我创建了一个图表:

代码:

#python imports
import awswrangler as wr
import boto3
from datetime import datetime
#airflow imports
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator

# First PythonOperator
def read_csv(s3_dir,client_s3,**kwargs):

df_list = wr.s3.read_csv(s3_dir, sep='|', chunksize = 100,boto3_session = client_s3)

intermediate_dir = Variable.get("s3_intermediate_dir")

for idx, df in enumerate(df_list):
wr.s3.to_csv(df,intermediate_dir + "intermediate_{}".format(idx))

ti.xcom_push(key='number_of_chunks', value=len(df_list))

# * PythonOperator
def transform_dataframes(s3_dir,client_s3,**kwargs):
#DO SOMETHING

# Final PythonOperator
def agg_df_one(s3_dir,client_s3,**kwargs):
#list all files
all_files = wr.s3.list_objects(s3_dir, boto3_session = client_s3)
#agg_df_one
df_list = [wr.s3.read_csv(file, sep='|', chunksize = 100,boto3_session = client_s3) for file in all_files]

final_df = pd.concat(df_list, axis=0, ignore_index=True)
now = datetime.now()
dt_string = now.strftime("%d/%m/%Y-%H")

#save final s3
final_dir = Variable.get("s3_final_dir")
wr.s3.to_csv(final_df,final_dir + dt_string + "/final_df.csv")

#delete all
wr.s3.delete_objects(all_files,boto3_session = client_s3)

default_args = {
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'retries': 0,
}

dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
schedule_interval='@hourly',
catchup=False,
max_active_runs=3,
oncurrency = 10,
)

client_s3 = boto3.session(aws_access_key_id = Variable.get("s3_key_id"),aws_secret_access_key = Variable.get("s3_secret_key"))

wr.s3.endpoint = Variable.get("s3_endpoint_url")

read_csv_task = PythonOperator(
task_id='read_csv_func',
python_callable=read_csv,
provide_context=True,
op_kwargs={'s3_dir':Variable.get("init_s3_dir",'client_s3':client_s3},
dag=dag
)

transform_dataframes_task = [PythonOperator(
task_id=f'test_parallelism_dag.transform_dataframe_chunk_{i}',
python_callable=transform_dataframes,
provide_context=True,
dag=dag
) for i in "{{ ti.xcom_pull(key='number_of_chunks') }}"]


agg_df_one_task = PythonOperator(
task_id='agg_df_one_func',
python_callable=agg_df_one,
provide_context=True,
trigger_rule = 'none_failed',
dag=dag
)

read_csv_task >> transform_dataframes_task >> agg_df_one_task

我的问题是,如何将 block 列表的大小传递给任务“transform_dataframes_task”以创建所有需要的任务?

感谢您的想法!

最佳答案

这在今天是不可能的,但很快就会成为可能。

我们刚刚批准了 Apache Airflow 中的 AIP( Airflow 改进提案),它将启用此模式 - https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-42%3A+Dynamic+Task+Mapping并且很有可能在 Airflow 2.3 中得到实现。

关于python - 如何考虑同一 DAG 中先前任务的结果来创建动态数量的任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70135958/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com