gpt4 book ai didi

python - 执行运算符后 Airflow 得到结果

转载 作者:太空宇宙 更新时间:2023-11-03 13:13:13 27 4
gpt4 key购买 nike

我配置了 airflow 并创建了一些 Dag 和 subDag 来调用多个操作符。

我的麻烦是,当运算符(operator)运行并完成作业时,我想以某种 Python 结构接收返回的结果。例如:

File1.py

  ...
...
sub_dag_one=SubDagOperator(subdag=subdag_accessHive(
PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, STEP, macros,path,
),
task_id=DELP_DAG_NAME,
dag=dag,
)

File2.py

  from airflow import DAG
from airflow.operators import HiveOperator
def subdag_callHive(parent, child, args, step,
user_defined_macros, path
):
dag_subdag = DAG(
dag_id='%s.%s' % (parent, child),
default_args=args,
schedule_interval="@daily",
template_searchpath=path,
user_defined_macros=user_defined_macros,
)

# some work...

HiveOperator(
task_id='some_id',
hiveconf_jinja_translate=True,
hql='select field1 from public.mytable limit 4;',
trigger_rule='all_done',
dag=dag_subdag,
)

return dag_subdag

函数 subdag_callHive 是从另一个 python 脚本调用的,其中定义了主 Dag 和所有其他所需的参数。

我只需要能够从 HiveOperator 获得结果(*select * from public.mytable limit 4;*) 在本例中为 4 个值。

返回的 dag_subdag 是一个对象 并且包含传递给调用的所有属性/数据,但没有关于 HiveOperator 做了什么的信息。

这可能吗?如果是的话,如何实现。

最佳答案

您可以根据需要使用 Hooks。 HiveOperator 基本上做同样的事情,他调用 Hive Hooks,它有多种方法来处理结果。

使用 PythonOperator 调用一个函数,然后启动一个配置单元 Hook 。

以下示例可能对您有所帮助。

代码片段:

callHook = PythonOperator(
task_id='foo',
python_callable=do_work,
dag=dag
)

def do_work():
hiveserver = HiveServer2Hook()
hql = "SELECT COUNT(*) FROM foo.bar"
row_count = hiveserver.get_records(hql, schema='foo')
print row_count[0][0]

所有可用的方法都可以在这里找到:https://github.com/apache/incubator-airflow/blob/master/airflow/hooks/hive_hooks.py

关于python - 执行运算符后 Airflow 得到结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38039045/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com