gpt4 book ai didi

Airflow:如何从 BigQueryOperator 插入 xcom 值(value)?

转载 作者:行者123 更新时间:2023-12-02 13:33:27 26 4
gpt4 key购买 nike

这是我的运算符(operator):

bigquery_check_op = BigQueryOperator(
task_id='bigquery_check',
bql=SQL_QUERY,
use_legacy_sql = False,
bigquery_conn_id=CONNECTION_ID,
trigger_rule='all_success',
xcom_push=True,
dag=dag
)

当我检查 UI 中的渲染页面时。那里什么也没有出现。当我在控制台中运行 SQL 时,它返回正确的值 1400。为什么运营商不推送XCOM?

我无法使用BigQueryValueCheckOperator。该运算符被设计为在值检查时失败。我不想让任何事情失败。我只是想根据查询的返回值对代码进行分支。

最佳答案

以下是您如何使用 BigQueryHook 和 BranchPythonOperator 来完成此任务:

from airflow.operators.python_operator import BranchPythonOperator
from airflow.contrib.hooks import BigQueryHook

def big_query_check(**context):
sql = context['templates_dict']['sql']
bq = BigQueryHook(bigquery_conn_id='default_gcp_connection_id',
use_legacy_sql=False)
conn = bq.get_conn()
cursor = conn.cursor()
results = cursor.execute(sql)

# Do something with results, return task_id to branch to
if results == 0:
return "task_a"
else:
return "task_b"


sql = "SELECT COUNT(*) FROM sales"


branching = BranchPythonOperator(
task_id='branching',
python_callable=big_query_check,
provide_context= True,
templates_dict = {"sql": sql}
dag=dag,
)

首先我们创建一个Python可调用函数,我们可以用它来执行查询并选择要分支的task_id。其次,我们创建 BranchPythonOperator。

关于Airflow:如何从 BigQueryOperator 插入 xcom 值(value)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52801318/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com