gpt4 book ai didi

airflow - 在 Airflow 中创建动态池

转载 作者:行者123 更新时间:2023-12-05 06:30:22 29 4
gpt4 key购买 nike

我有一个 DAG,它创建一个集群,开始计算任务,并在它们完成后拆除集群。我想将此集群上承载的计算任务的并发数限制为固定数量。所以从逻辑上讲,我需要一个专用于任务创建的集群的池。我不想干扰其他 DAG 或同一 DAG 的不同运行。

我认为我可以通过在创建集群后从任务动态创建池并在计算任务完成后将其删除来解决这个问题。我想我可以模板化计算任务的 pool 参数,让它们使用这个动态创建的集群。

# execute registers a pool and returns with the pool name
create_pool = CreatePoolOperator(
slots=4,
task_id='create_pool',
dag=self
)

# the pool parameter is templated
computation = ComputeOperator(
task_id=compute_subtask_name,
pool="{{ ti.xcom_pull(task_ids='create_pool') }}",
dag=self
)

create_pool >> computation

但是这样计算任务将永远不会被触发。所以我认为 pool 参数在被模板化之前保存在任务实例中。我想听听您对如何实现理想行为的想法。

最佳答案

这里是一个运算符,如果它不存在则创建一个池。

from airflow.api.common.experimental.pool import get_pool, create_pool
from airflow.exceptions import PoolNotFound
from airflow.models import BaseOperator
from airflow.utils import apply_defaults


class CreatePoolOperator(BaseOperator):
# its pool blue, get it?
ui_color = '#b8e9ee'

@apply_defaults
def __init__(
self,
name,
slots,
description='',
*args, **kwargs):
super(CreatePoolOperator, self).__init__(*args, **kwargs)
self.description = description
self.slots = slots
self.name = name

def execute(self, context):
try:
pool = get_pool(name=self.name)
if pool:
self.log(f'Pool exists: {pool}')
return
except PoolNotFound:
# create the pool
pool = create_pool(name=self.name, slots=self.slots, description=self.description)
self.log(f'Created pool: {pool}')

可以用类似的方式删除池。

关于airflow - 在 Airflow 中创建动态池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52426489/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com