gpt4 book ai didi

python - 我可以 get() 或 xcom.pull() Airflow 脚本的 MAIN 部分中的变量(在 PythonOperator 之外)吗?

转载 作者:行者123 更新时间:2023-12-01 00:23:23 28 4
gpt4 key购买 nike

我遇到了一种情况,我需要在 S3 中找到特定文件夹以传递到 Airflow 脚本中的 PythonOperator。我正在使用另一个找到正确目录的 PythonOperator 来执行此操作。我可以成功地使用 xcom.push() 或 Variable.set() 并在 PythonOperator 中将其读回。问题是,我需要将此变量传递给使用 python 库中的代码的单独的 PythonOperator。因此,我需要在 Airflow 脚本的主要部分中使用 Variable.get() 或 xcom.pull() 这个变量。我已经搜索了很多,似乎无法弄清楚这是否可能。下面是一些引用代码:

    def check_for_done_file(**kwargs):

### This function does a bunch of stuff to find the correct S3 path to
### populate target_dir, this has been verified and works

Variable.set("target_dir", done_file_list.pop())
test = Variable.get("target_dir")
print("TEST: ", test)

#### END OF METHOD, BEGIN MAIN

with my_dag:

### CALLING METHOD FROM MAIN, POPULATING VARIABLE

check_for_done_file_task = PythonOperator(
task_id = 'check_for_done_file',
python_callable = check_for_done_file,
dag = my_dag,
op_kwargs = {
"source_bucket" : "my_source_bucket",
"source_path" : "path/to/the/s3/folder/I/need"
}
)

target_dir = Variable.get("target_dir") # I NEED THIS VAR HERE.

move_data_to_in_progress_task = PythonOperator(
task_id = 'move-from-incoming-to-in-progress',
python_callable = FileOps.move, # <--- PYTHON LIBRARY THAT COPIES FILES FROM SRC TO DEST
dag = my_dag,
op_kwargs = {
"source_bucket" : "source_bucket",
"source_path" : "path/to/my/s3/folder/" + target_dir,
"destination_bucket" : "destination_bucket",
"destination_path" : "path/to/my/s3/folder/" + target_dir,
"recurse" : True
}
)

那么,完成此操作的唯一方法是增强库以查找“target_dir”变量吗?我认为 Airflow main 没有上下文,因此我想做的事情可能是不可能的。请各位 Airflow 专家参与进来,让我知道我的选择是什么。

最佳答案

op_kwargs 是一个模板化字段。所以你可以使用xcom_push:

def check_for_done_file(**kwargs):
...
kwargs['ti'].xcom_push(value=y)

并在op_kwargs中使用jinja模板:

   move_data_to_in_progress_task = PythonOperator(
task_id = 'move-from-incoming-to-in-progress',
python_callable = FileOps.move, # <--- PYTHON LIBRARY THAT COPIES FILES FROM SRC TO DEST
dag = my_dag,
op_kwargs = {
"source_bucket" : "source_bucket",
"source_path" : "path/to/my/s3/folder/{{ ti.xcom_pull(task_ids='check_for_done_file') }}",
"destination_bucket" : "destination_bucket",
"destination_path" : "path/to/my/s3/folder/{{ ti.xcom_pull(task_ids='check_for_done_file') }}",
"recurse" : True
}
)

此外,将 provide_context=True 添加到您的 check_for_done_file_task 任务中,以将上下文字典传递给可调用项。

关于python - 我可以 get() 或 xcom.pull() Airflow 脚本的 MAIN 部分中的变量(在 PythonOperator 之外)吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58803002/

28 4 0
文章推荐: jquery - 让 Jquery 在可放置和可拖动的
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com