gpt4 book ai didi

google-bigquery - 如何重复 BigQueryOperator Dag 并将不同的日期传递给我的 sql 文件

转载 作者:行者123 更新时间:2023-12-04 07:22:50 24 4
gpt4 key购买 nike

我有一个要使用 BigQueryOperator 运行的查询。每天,它将运行过去 21 天。 sql 文件保持不变,但传递给文件的日期发生了变化。因此,例如今天,它将为今天的日期运行,然后为昨天的日期重复,然后重复 2 天前,一直到 21 天前。所以它将在 7/14/2021 运行,所以我需要将此日期传递给我的 sql 文件。然后它将运行 7/13/2021,我需要传递给我的 sql 文件的日期是 7/13/2021。如何在日期范围内重复此 dag,并将此日期动态传递给 sql 文件。
在 BigQueryOperator 中,变量在“user_defined_macros,”部分中传递,所以我不知道如何更改我传递的日期。我想过遍历日期数组,但我不知道如何传递该日期到 BigQueryOperator 中链接的 sql 文件。
我的 sql 文件有 300 行长,所以我在下面包含了一个简单的例子,因为人们似乎要求一个。
有向无环图

with DAG(
dag_id,
schedule_interval='0 12 * * *',
start_date=datetime(2021, 1, 1),
template_searchpath='/opt/airflow/dags',
catchup=False,
user_defined_macros={"varsToPass":Var1
}

) as dag:
query_one = BigQueryOperator(
task_id='query_one',
sql='/sql/something.sql',
use_legacy_sql=False,
destination_dataset_table ='table',
write_disposition = 'WRITE_TRUNCATE'

)
sql文件
SELECT * FROM table WHERE date = {{CHANGING_DATE}}

最佳答案

您的代码令人困惑,因为您描述了 today,today-1 day, ..., today - 21 days 的重复模式但是您的代码显示 write_disposition = 'WRITE_TRUNCATE'这意味着只有最后一个查询很重要,因为每个查询都会删除前一个查询的结果。由于没有提供更多信息,我假设您实际上是想在今天到今天之间运行一个查询 - 21 天。
您也没有提到您所指的日期是否是 Airflow execution_date或今天日期。
如果是 execution_date您不需要传递任何参数。 SQL 需要是:

SELECT * FROM table WHERE date BETWEEN {{ execution_date }} AND
{{ execution_date - macros.timedelta(days=21) }}
如果是今天,那么您需要使用 params 传递参数:
from datetime import datetime
query_one = BigQueryOperator(
task_id='query_one',
sql='/sql/something.sql',
use_legacy_sql=False,
destination_dataset_table ='table',
write_disposition = 'WRITE_TRUNCATE',
params={
"end": datetime.utcnow().strftime('%Y-%m-%d'),
"start": (datetime.now() - datetime.timedelta(days=21)).strftime('%Y-%m-%d')
}

)
然后在 SQL 中,您可以将其用作:
SELECT * FROM table WHERE date BETWEEN {{ params.start }} AND
{{ params.end }}
我想指出的是,如果您不使用 execution_date然后我看不到从 Airflow 传递日期的值(value)。您可以直接使用 BigQuery 将查询设置为:
SELECT *
FROM table
WHERE date BETWEEN DATE_SUB(current_date(), INTERVAL 21 DAY) AND current_date()
如果我的假设不正确并且您想运行 21 个查询,那么您可以按照您的描述使用循环来执行此操作:
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
a = []
for i in range(0, 21):
a.append(
BigQueryOperator(
task_id=f'query_{i}',
sql='/sql/something.sql',
use_legacy_sql=False,
destination_dataset_table='table',
write_disposition='WRITE_TRUNCATE', # This is probably wrong, I just copied it from your code.
params={
"date_value": (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
}
)

)
if i not in [0]:
a[i - 1] >> a[i]
然后在您的 /sql/something.sql查询应该是:
SELECT * FROM table WHERE date = {{ params.date_value }}
如前所述,这将创建一个工作流程:
enter image description here
另请注意 BigQueryOperator已弃用。您应该使用 BigQueryExecuteQueryOperator可通过 Google 提供商获得
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
有关如何安装 Google provider 的更多信息,请参阅以下内容的第二部分 answer .

关于google-bigquery - 如何重复 BigQueryOperator Dag 并将不同的日期传递给我的 sql 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68386323/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com