gpt4 book ai didi

python - 如何为 Airflow 任务生成不同的 ID?

转载 作者:行者123 更新时间:2023-12-05 03:40:08 25 4
gpt4 key购买 nike

我正在尝试使用 @task 注释调用函数 N 次,但我无法使用此装饰器定义 task_id,如果我不止一次尝试调用它,它说:

airflow.exceptions.DuplicateTaskIdFound: Task id 'my_task_group.make_request__1' has already been added to the DAG

@task
def make_request(params):
return true

def my_first_function():
# do stuff
return make_request(params)

def my_second_function():
# do stuff
return make_request(params)

for i in range(0, 10)
first = my_first_function() # this will call "make_request"
second = my_second_function() # this will also call "make_request"

first >> second

如何在 @task 注释上动态地“重命名”task_id

最佳答案

使用@task 允许通过调用装饰函数动态生成task_iddocs _get_unique_task_id 状态:

Generate unique task id given a DAG (or if run in a DAG context)Ids are generated by appending a unique number to the end ofthe original task id.Example:task_idtask_id__1task_id__2...task_id__20

使用此功能,无需动态“重命名”任务。在您的代码示例中,您应该修饰在循环中调用的函数。这是一个有效的运行版本 2.0.1 示例:

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context

default_args = {
'owner': 'airflow',
}

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(1), catchup=False, tags=['example'])
def task_decorator_example():

def make_request(params):
print(f"Params: {params}")

def _print_task_id():
context = get_current_context()
print(f"Result: {context['ti'].task_id}")

@task
def my_first_function():
_print_task_id()
context = get_current_context()
return make_request(context['params'])

@task
def my_second_function():
_print_task_id()
params = {'foo': 'bar'}
return make_request(params)

for i in range(0, 3):
first = my_first_function() # this will call "make_request"
second = my_second_function() # this will also call "make_request"

first >> second


example_decorated_dag = task_decorator_example()

创建此图形 View :

graph_view

每个任务都会打印task_idparams,合并后的日志输出是这样的:

- my_first_function
{logging_mixin.py:104} INFO - Result: my_first_function
{logging_mixin.py:104} INFO - Params: {}
- my_second_function
{logging_mixin.py:104} INFO - Result: my_second_function
{logging_mixin.py:104} INFO - Params: {'foo': 'bar'}
- my_first_function__1
{logging_mixin.py:104} INFO - Result: my_first_function__1
{logging_mixin.py:104} INFO - Params: {}
- my_second_function__1
{logging_mixin.py:104} INFO - Result: my_second_function__1
{logging_mixin.py:104} INFO - Params: {'foo': 'bar'}
- my_first_function__2
{logging_mixin.py:104} INFO - Result: my_first_function__2
{logging_mixin.py:104} INFO - Params: {}

希望对你有用!

关于python - 如何为 Airflow 任务生成不同的 ID?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68275679/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com