gpt4 book ai didi

google-bigquery - 一个 BigQueryOperator 的模板化 destination_dataset_table arg 从另一个模板化

转载 作者:行者123 更新时间:2023-12-01 22:29:52 27 4
gpt4 key购买 nike

我正在尝试在 ETL 管道中将一堆 BigQuery SQL 命令链接在一起,其中一些输出和输入将带有时间戳。

from datetime import timedelta
import airflow
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

DAG_NAME = 'foo'

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(7),
'email': ['xxx@xxx.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}

dag = DAG(
dag_id="blah",
default_args=default_args,
schedule_interval=None,
template_searchpath=["/usr/local/airflow/dags/xxx/sql"])


GOOGLE_PROJECT_ID = 'xxx'
DATASET_ID = 'xxx'
first_output = GOOGLE_PROJECT_ID + ":" + DATASET_ID + "." + "first_output_" + '{{ ds_nodash }}'
second_output = GOOGLE_PROJECT_ID + ":" + DATASET_ID + "." + "second_output"
GOOGLE_CLOUD_PLATFORM_CONNECTION_ID="google_cloud_default"


first_op = BigQueryOperator(
task_id='first_output',
dag=dag,
bigquery_conn_id=GOOGLE_CLOUD_PLATFORM_CONNECTION_ID,
bql="XXX.sql",
use_legacy_sql=True,
allow_large_results=True,
destination_dataset_table=first_output # {{ ds }} gets substituted because destination_dataset_table is a templated field
)

second_op = BigQueryOperator(
task_id='second_op',
dag=dag,
bigquery_conn_id=GOOGLE_CLOUD_PLATFORM_CONNECTION_ID,
bql="XXX_two.sql", # XXX_two.sql contains a {{ params.input_table }} reference
params={'input_table': first_op.destination_dataset_table},
use_legacy_sql=True,
allow_large_results=True,
destination_dataset_table=second_output

)

second_op.set_upstream(first_op)

XXX_two.sql 的内容:

SELECT * FROM [{{ params.input_table }}

测试通过:

airflow test blah second_op  2015-06-01

我目前的错误是(也在生产中)

Exception: BigQuery job failed. Final error was: {'reason': 'invalid', 'location': BLAH, 'message': 'Invalid table name: xxx:xx.first_output_{{ ds_nodash }}'}. 

如何在运算符执行之外访问模板化字段?

最佳答案

destination_dataset_table 字段肯定是模板化的,可以看出in the source code (1.9的,没有提供版本所以我拿的是最新的):

template_fields = ('bql', 'destination_dataset_table')

我会将创建字符串更改为:

first_output = "[{project}:{dataset}.first_output_{{{{ ds_nodash }}}}]".format(
project=GOOGLE_PROJECT_ID,
dataset=DATASET_ID)

四个花括号应该变成两个,结果字符串应该是这样的

[my_project:my_dataset.first_output_{{ ds_nodash }}]

现在 ds_nodashdestination_dataset_table 中使用时应该被解析。

请注意,我还为遗留语句添加了所需的括号 [ ]。我不确定这是否也与缺少括号有关。

编辑

正如@mask 正确指出的那样,您在 second_op params 中使用了来自 first_op 的字符串,我一开始没见过。

由于这些原因,这不起作用:

  • first_op 不应提供字符串,但您应该使用 first_output - 我仍然想知道为什么它首先起作用
  • 如果您从任务中提取字符串,您不会得到呈现的字符串,但始终是原始的模板字符串 *如果您不确定字段是否已被处理(如面具)
  • params 根本没有模板化,因此不会正确更新

这些是我能想到的解决方案:

  • 派生您自己的 BigDataOperator 并将 params 添加到模板化字段(如果可行 b/c 它是一个字典)
  • 或者扩展xxx_two.sql,这样它就不会使用params.input_table,也不会使用first_output。由于您希望 first_output 在模板中可用,因此您必须首先将其添加到 DAG 参数 user_defined_macros

要了解有关这些解决方案的更多信息,请查看此相关问题:Make custom Airflow macros expand other macros

关于google-bigquery - 一个 BigQueryOperator 的模板化 destination_dataset_table arg 从另一个模板化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51315376/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com