gpt4 book ai didi

airflow - 如何使用 bigquery 运算符将查询参数传递给 sql 文件

转载 作者:行者123 更新时间:2023-12-03 23:28:18 24 4
gpt4 key购买 nike

我需要在 sql 文件中访问 BigqueryOperator 传递的参数,但我收到错误 ERROR - queryParameters argument must have a type <class 'dict'> not <class 'list'>我正在使用以下代码:

t2 = bigquery_operator.BigQueryOperator(
task_id='bq_from_source_to_clean',
sql='prepare.sql',
use_legacy_sql=False,
allow_large_results=True,
query_params=[{ 'name': 'threshold_date', 'parameterType': { 'type': 'STRING' },'parameterValue': { 'value': '2020-01-01' } }],
destination_dataset_table="{}.{}.{}".format('xxxx',
'xxxx',
'temp_airflow_test'),
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
dag=dag

)

Sql:

select  cast(DATE_ADD(a.dt_2, interval 7 day) as DATE) as dt_1
,a.dt_2
,cast('2010-01-01' as DATE) as dt_3
from (select cast(@threshold_date as date) as dt_2) a

我使用的是谷歌 Composer 版本composer-1.7.0-airflow-1.10.2

提前致谢。

最佳答案

深入研究源代码后,BigQueryHook 似乎在 Airflow 1.10.3 中修复了一个错误。

您定义 query_params 的方式对于较新版本的 Airflow 是正确的,根据 BigQuery API 应该是一个列表:请参阅 https://cloud.google.com/bigquery/docs/parameterized-queries#bigquery_query_params_named-python .

无论如何,您会收到此错误,因为在 Airflow 1.10.2 中,query_params 被定义为 dict,请参阅:

https://github.com/apache/airflow/blob/1.10.2/airflow/contrib/hooks/bigquery_hook.py#L678

query_param_list = [
...
(query_params, 'queryParameters', None, dict),
...
]

这会导致内部 _validate_value 函数抛出 TypeError :

https://github.com/apache/airflow/blob/1.10.2/airflow/contrib/hooks/bigquery_hook.py#L1954

def _validate_value(key, value, expected_type):
""" function to check expected type and raise
error if type is not correct """
if not isinstance(value, expected_type):
raise TypeError("{} argument must have a type {} not {}".format(
key, expected_type, type(value)))

我在 Airflow 1.10.2(或任何单元测试...)中没有找到任何 query_params 示例,但我认为这只是因为它不可用。

这些错误已被这些提交修复:

这些更改已嵌入 Airflow 1.10.3,但截至目前,Airflow 1.10.3 在 Composer (https://cloud.google.com/composer/docs/concepts/versioning/composer-versions#new_environments) 中不可用:最新版本已于 2019 年 5 月 16 日发布并嵌入版本 1.10.2 .

等待这个新版本,我看到了两种方法来解决您的问题:

  • 复制/粘贴 BigQueryOperatorBigQueryHook 的固定版本并将它们嵌入到您的源中以使用它们,或者扩展现有的 BigQueryHook 并覆盖错误的方法。我不确定您是否可以直接修补 BigQueryHook(在 Composer 环境中无法访问这些文件)
  • 自己模板化您的 SQL 查询(而不是使用 query_params)

关于airflow - 如何使用 bigquery 运算符将查询参数传递给 sql 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56287061/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com