gpt4 book ai didi

python - 使用 Apache Airflow 执行包含 PySpark 代码的 Databricks Notebook

转载 作者:行者123 更新时间:2023-12-01 00:48:29 26 4
gpt4 key购买 nike

我正在使用 Airflow、Databricks 和 PySpark。我想知道当我想通过 Airflow 执行 Databricks Notebook 时是否可以添加更多参数。

我用 Python 编写了下一个代码,名为 MyETL:

def main(**kwargs):
spark.sql("CREATE TABLE {0} {1}".format(table, columns))
print("Running my ETL!")

if __name__== "__main__":
main(arg1, arg2)

我想定义其他任务参数来运行具有更多参数的 Databricks 笔记本,我想添加方法的名称以及这些方法的参数。例如,当我想在 Airflow 的 DAG 中注册任务时:

   notebook_task_params = {
'new_cluster': new_cluster,
'notebook_task': {
'notebook_path': '/Users/airflow@example.com/MyETL',
'method_name': 'main',
'params':'[{'table':'A'},{'columns':['a', 'b']}]'
},
}

我不知道这是否可能,因为我没有找到类似的例子。

# Example of using the JSON parameter to initialize the operator.
notebook_task = DatabricksSubmitRunOperator(
task_id='notebook_task',
dag=dag,
json=notebook_task_params)

换句话说,我想使用 Airflow 执行带有参数的笔记本。我的问题是我该怎么做?

最佳答案

您也可以将 method_name 添加为 params,然后在笔记本上解析出您的逻辑。

但是,这里更常见的模式是确保该方法已安装在您的集群上。

params = '[{'table':'A'},{'columns':['a', 'b']}]'

然后在 databricks 上的笔记本中:

table = getArgument("table", "DefaultValue")
columns = getArgument("columns", "DefaultValue")

result = method(table, columns)
<小时/>

如果您可以在笔记本作业运行中看到参数(上面附有图像),您还会知道是否可以通过 getArgument() 访问参数。

enter image description here

关于python - 使用 Apache Airflow 执行包含 PySpark 代码的 Databricks Notebook,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56757281/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com