gpt4 book ai didi

python - 在 Apache Airflow 的自定义运算符中访问 params 参数

转载 作者:太空狗 更新时间:2023-10-30 01:31:06 24 4
gpt4 key购买 nike

问题

我想将值列表或任何值作为参数传递给自定义运算符,修改运算符中的值,然后通过 { { params }} 宏。

当前设置

以下是我的设置的相关部分,为清楚起见略微做作。

DAG 定义:

from airflow import DAG
from datetime import timedelta, datetime
from acme.operators.dwh_operators import ProcessDimensionOperator

default_args = {
'owner': 'airflow',
'start_date': datetime(2019, 2, 27),
'provide_context': True,
'depends_on_past': True
}

dag = DAG(
'etl',
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60),
template_searchpath=tmpl_search_path,
default_args=default_args,
max_active_runs=1)

process_product_dim = ProcessDimensionOperator(
task_id='process_product_dim',
mysql_conn_id='mysql_dwh',
sql='process_dimension.sql',
database='dwh',
col_names=[
'id',
'name',
'category',
'price',
'available',
'country',
],
t_name='products',
dag=dag)

运算符定义:

from airflow.hooks.mysql_hook import MySqlHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

class ProcessDimensionOperator(BaseOperator):
template_fields = (
'sql',
'parameters')
template_ext = ('.sql',)

@apply_defaults
def __init__(
self,
sql,
t_name,
col_names,
database,
mysql_conn_id='mysql_default',
*args, **kwargs):
super(ProcessDimensionOperator, self).__init__(*args, **kwargs)
self.sql = sql
self.t_name = t_name
self.col_names = col_names
self.database = database
self.mysql_conn_id = mysql_conn_id
self.parameters = parameters

def execute(self, context):
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)

self.params['col_names'] = self.col_names
self.params['t_name'] = self.t_name
self.params['match_statement'] = self.construct_match_statement(self.col_names)

hook.run(sql=self.sql)

def construct_match_statement(self, cols):
map_list = map(lambda x: f'and t.{x} = s.{x}', cols[1:])

return ' '.join(map_list)

进程维度.sql

create table if not exists staging.{{ params.t_name }};

select
*
from
source.{{ params.t_name }} as source
join
target.{{ params.t_name }} as target
on source.id = target.id {{ params.match_statement }}

但这会引发错误,因为 {{ params.t_name }}{{ params.match_statement}} 呈现为 null。

我尝试过的

  • 在任务定义的params 参数中设置t_namec_name,并将映射/连接逻辑留在sql 模板中。这行得通,但如果可能的话,我想将逻辑保留在模板之外
  • params={xxx} 传递到 super(ProcessDimensionOperator, self).__init__(params=params, *args, **kwargs)
  • 将参数作为 parameters={xxx} 传递到 hook.run() 方法中,并使用 %(x)s 对其进行模板化但这会导致问题,因为它会在变量周围加上引号,从而弄乱各种 sql 语句

我对 python 和 Airflow 还很陌生,所以我可能会遗漏一些明显的东西,非常感谢任何帮助,谢谢!

最佳答案

这里也一样。我刚刚花了几个小时(几天?)找出问题的原因(上帝保佑 IPython.embed 和日志记录)。从 Airflow 1.10.3 开始,它是由 TaskInstance.render_templates() 引起的,在呈现任何 template_fields 或 template_exts 之后,它不会更新 Jinja 上下文,只会更新任务属性。看吧here !

因此你只需要使用

{{ task.params.whatever }}

代替

{{ params.whatever }}

在您的 .sql 模板文件中。

事实上,如果Jinja上下文不断更新,那么就真的要注意模板的顺序和依赖关系了。这是一种嵌套/递归渲染。它也可能有性能缺点。

此外,我不建议使用“parameters”(与“params”不同),因为它们似乎旨在作为参数传递给数据库游标,然后您将无法传递数字/整数、列名或表名,或只是一个 SQL 片段(例如,where、having、limit、...)。

关于python - 在 Apache Airflow 的自定义运算符中访问 params 参数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54911055/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com