gpt4 book ai didi

python - 如何将数据从一个运算符(operator)传递给另一个运算符(operator)

转载 作者:行者123 更新时间:2023-12-03 19:36:49 39 4
gpt4 key购买 nike

我制作了一个自定义 Airflow 操作符,这个操作符接受一个输入,这个操作符的输出在 XCOM 上。

我想要实现的是使用一些定义的输入调用运算符,将输出解析为可在分支运算符内部调用的 Python,然后将解析的输出传递给另一个调用相同运算符树的任务:

CustomOperator_Task1 = CustomOperator(
data={
'type': 'custom',
'date': '2017-11-12'
},
task_id='CustomOperator_Task1',
dag=dag)

data = {}
def checkOutput(**kwargs):
result = kwargs['ti'].xcom_pull(task_ids='CustomOperator_Task1')

if result.success = True:
data = result.data
return "CustomOperator_Task2"
return "Failure"

BranchOperator_Task = BranchPythonOperator(
task_id='BranchOperator_Task ',
dag=dag,
python_callable=checkOutput,
provide_context=True,
trigger_rule="all_done")

CustomOperator_Task2 = CustomOperator(
data= data,
task_id='CustomOperator_Task2',
dag=dag)

CustomOperator_Task1 >> BranchOperator_Task >> CustomOperator_Task2

在任务中 CustomOperator_Task2我想传递来自 BranchOperator_Task 的解析数据.现在它总是空的 {}
最好的方法是什么?

最佳答案

我现在看到你的问题了。设置 data由于 Airflow 的工作方式,像您这样的变量将无法工作。一个完全不同的进程将运行下一个任务,所以它不会有什么 data 的上下文。被设置为。

相反,BranchOperator_Task必须将解析的输出推送到另一个 XCom 所以 CustomOperator_Task2可以显式地获取它。

def checkOutput(**kwargs):
ti = kwargs['ti']
result = ti.xcom_pull(task_ids='CustomOperator_Task1')

if result.success:
ti.xcom_push(key='data', value=data)
return "CustomOperator_Task2"
return "Failure"

BranchOperator_Task = BranchPythonOperator(
...)

CustomOperator_Task2 = CustomOperator(
data_xcom_task_id=BranchOperator_Task.task_id,
data_xcom_key='data',
task_id='CustomOperator_Task2',
dag=dag)

那么您的运算符(operator)可能看起来像这样。
class CustomOperator(BaseOperator):

@apply_defaults
def __init__(self, data_xcom_task_id, data_xcom_key, *args, **kwargs):
self.data_xcom_task_id = data_xcom_task_id
self.data_xcom_key = data_xcom_key
def execute(self, context):
data = context['ti'].xcom_pull(task_ids=self.data_xcom_task_id, key=self.data_xcom_key)
...

如果您只想对它们进行硬编码,则可能不需要参数。这取决于您的用例。

关于python - 如何将数据从一个运算符(operator)传递给另一个运算符(operator),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47758033/

39 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com