gpt4 book ai didi

google-bigquery - 在 Airflow 中循环参数的最佳方法?

转载 作者:行者123 更新时间:2023-12-02 11:57:22 26 4
gpt4 key购买 nike

我正在努力熟悉 Airflow,并且到目前为止我很喜欢它。

但是,我有点不清楚的一件事是如何正确参数化我的 dag,我想在其中运行相同的 dag,但对于多个业务线(lob)并行运行。所以基本上我想在每次运行中为多个 lob 运行下面的 dag,并让每个 lob 并行运行。

假设我定义了一个变量,它是一个 lob 数组,如“lob1”、“lob2”等。我想将下面的 bigquery sql 语句中的“mylob”替换为“lob1”,然后替换为“lob2”等.

我在想也许我可以将 lobs 存储为 ui 中的变量,然后在 dag 中循环它,但我不确定这是否最终会是连续的,因为它等待每个任务在每个任务中完成循环迭代。

我认为的另一种方法可能是使用这个参数化的 dag 作为更大的驱动程序 dag 中的子 dag。但再次不确定这是否是最佳实践方法。

非常感谢任何帮助或指示。我觉得我在这里遗漏了一些明显的东西,但在任何地方都没有找到这样的例子。

"""
### My first dag to play around with bigquery and gcp stuff.
"""

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from dateutil import tz
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 3, 10),
'email': ['xxx@xxx.com'],
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

with DAG('my_bq_dag_2', schedule_interval='30 */1 * * *',
default_args=default_args) as dag:

bq_msg_1 = BigQueryOperator(
task_id='my_bq_task_1',
bql='select "mylob" as lob, "Hello World!" as msg',
destination_dataset_table='airflow.test1',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='gcp_smoke'
)

bq_msg_1.doc_md = """\
#### Task Documentation
Append a "Hello World!" message string to the table [airflow.msg]
"""

bq_msg_2 = BigQueryOperator(
task_id='my_bq_task_2',
bql='select "mylob" as lob, "Goodbye World!" as msg',
destination_dataset_table='airflow.test1',
write_disposition='WRITE_APPEND',
bigquery_conn_id='gcp_smoke'
)

bq_msg_2.doc_md = """\
#### Task Documentation
Append a "Goodbye World!" message string to the table [airflow.msg]
"""

# set dependencies
bq_msg_2.set_upstream(bq_msg_1)

更新:尝试使其正常工作,但似乎永远无法实现 lob2


"""
### My first dag to play around with bigquery and gcp stuff.
"""

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 3, 10),
'email': ['xxx@xxx.com'],
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG('my_bq_dag_2', schedule_interval='@once',default_args=default_args)

lobs = ["lob1","lob2","lob3"]

for lob in lobs:

templated_command = """
select '{{ params.lob }}' as lob, concat(string(current_timestamp()),' - Hello - {{ ds }}') as msg
"""

bq_msg_1 = BigQueryOperator(
dag = dag,
task_id='my_bq_task_1',
bql=templated_command,
params={'lob': lob},
destination_dataset_table='airflow.test1',
write_disposition='WRITE_APPEND',
bigquery_conn_id='gcp_smoke'
)

最佳答案

我认为我已经找到了一个似乎对我有用的答案/方法(上面的问题是没有唯一的任务 ID)。

在该示例上发表了一篇小博客文章,以防对其他人有用。

http://engineering.pmc.com/2017/03/playing-around-with-apache-airflow-bigquery-62/

关于google-bigquery - 在 Airflow 中循环参数的最佳方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42930221/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com