gpt4 book ai didi

How can I make a choce between my tasks in airflow?(如何才能在气流中完成任务?)

转载 作者:bug小助手 更新时间:2023-10-22 16:39:03 27 4
gpt4 key购买 nike



I have a table in my DB where I'm logging some statuses.
If the status is ok for some date I should run sending_reports_if_success task, if it's not ok then sending_reports_if_failed.

我在数据库中有一个表,记录一些状态。如果某个日期的状态正常,我应该运行sending_reports_If_success任务,如果不正常,则sending_reports_If_failed。


How can I create a making choice task which will get result from
sql :

如何创建一个从sql中获得结果的决策任务:


select count(*) from r_summary rs
where rs.status = 'Not Ok'
and rs.created_date = '{start_task_time}';

And if count = 1 then I should run failure task, if count = 0 then sucess task

如果count=1,那么我应该运行失败任务,如果count=0,那么成功任务


Graph


更多回答
优秀答案推荐

BranchPythonOperator is suitable for you case. You can pass the result from your query task to branching task and it will direct the flow to correspondingly using task ID.

BranchPythonOperator适合您的情况。您可以将查询任务的结果传递给分支任务,它将使用任务ID相应地引导流。


from airflow.operators.python import BranchPythonOperator


def choose_branch(query_result):
if query_result == 1:
return "sending_reports_if_success_task_id"

return "sending_reports_if_failed_task_id"

branching = BranchPythonOperator(
task_id='choose_report_branch',
python_callable=choose_branch,
op_args=["{{ ti.xcom_pull('your_query_task', 'your_return_key') }}"]
)

更多回答

Thanks, but how can write a task that will return a key? def extract_data(conn_id, **context): engine = get_conn(conn_id) df = pd.read_sql_query(f'''select * from r_summary rs where rs.status = 'OK' and rs.created_date = '{start_task_time}'::timestamp with time zone;''',con=engine) return df checking_exists_reports = PythonOperator( task_id='checking_exists_reports', python_callable=extract_data, op_kwargs={ 'load_date': '{{ts}}', 'conn_id' : 'postrgres' } )

谢谢,但是如何编写一个返回密钥的任务呢?def extract_data(conn_id,**context):engine=get_conn(conn_id)df=pd.read_sql_query(f''从r_summary rs中选择*,其中rs.status=“OK”和rs.created_date=“{start_task_time}”::带时区的时间戳;'',con=engine)返回df-checking_exists_reports=python操作员(task_id=“checking_exists-reports”,python_callable=extract_date,op_kwargs=“load_date”:“{{ts}}','conn_id':'postrgres'})

you can use context["task_instance"].xcom_push(key="your_query_task", value="your_value") in extract_data function to push desired value to xcom under your_query_task key.

您可以在extract_data函数中使用context[“task_instance”].xcom_push(key=“your_query_task”,value=“your_value”)将所需值推送到your_query.task键下的xcom。

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com