I have a table in my DB where I'm logging some statuses.
If the status is ok for some date I should run sending_reports_if_success task, if it's not ok then sending_reports_if_failed.
我在数据库中有一个表,记录一些状态。如果某个日期的状态正常,我应该运行sending_reports_If_success任务,如果不正常,则sending_reports_If_failed。
How can I create a making choice task which will get result from
sql :
如何创建一个从sql中获得结果的决策任务:
select count(*) from r_summary rs
where rs.status = 'Not Ok'
and rs.created_date = '{start_task_time}';
And if count = 1 then I should run failure task, if count = 0 then sucess task
如果count=1,那么我应该运行失败任务,如果count=0,那么成功任务
更多回答
优秀答案推荐
BranchPythonOperator
is suitable for you case. You can pass the result from your query task to branching task and it will direct the flow to correspondingly using task ID.
BranchPythonOperator适合您的情况。您可以将查询任务的结果传递给分支任务,它将使用任务ID相应地引导流。
from airflow.operators.python import BranchPythonOperator
def choose_branch(query_result):
if query_result == 1:
return "sending_reports_if_success_task_id"
return "sending_reports_if_failed_task_id"
branching = BranchPythonOperator(
task_id='choose_report_branch',
python_callable=choose_branch,
op_args=["{{ ti.xcom_pull('your_query_task', 'your_return_key') }}"]
)
更多回答
Thanks, but how can write a task that will return a key? def extract_data(conn_id, **context): engine = get_conn(conn_id) df = pd.read_sql_query(f'''select * from r_summary rs where rs.status = 'OK' and rs.created_date = '{start_task_time}'::timestamp with time zone;''',con=engine) return df checking_exists_reports = PythonOperator( task_id='checking_exists_reports', python_callable=extract_data, op_kwargs={ 'load_date': '{{ts}}', 'conn_id' : 'postrgres' } )
谢谢,但是如何编写一个返回密钥的任务呢?def extract_data(conn_id,**context):engine=get_conn(conn_id)df=pd.read_sql_query(f''从r_summary rs中选择*,其中rs.status=“OK”和rs.created_date=“{start_task_time}”::带时区的时间戳;'',con=engine)返回df-checking_exists_reports=python操作员(task_id=“checking_exists-reports”,python_callable=extract_date,op_kwargs=“load_date”:“{{ts}}','conn_id':'postrgres'})
you can use context["task_instance"].xcom_push(key="your_query_task", value="your_value")
in extract_data
function to push desired value to xcom under your_query_task
key.
您可以在extract_data函数中使用context[“task_instance”].xcom_push(key=“your_query_task”,value=“your_value”)将所需值推送到your_query.task键下的xcom。
我是一名优秀的程序员,十分优秀!