gpt4 book ai didi

airflow - key 错误 : 'ti' in Apache Airflow xcom

转载 作者:行者123 更新时间:2023-12-04 10:22:52 25 4
gpt4 key购买 nike

我们正在尝试运行一个带有 2 个任务的简单 DAG,这些任务将通过 xcom 进行数据通信。

DAG 文件:

from __future__ import print_function
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
'example_xcom',
schedule_interval="@once",
default_args=args)

value_1 = [1, 2, 3]


def push(**kwargs):
# pushes an XCom without a specific target
kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)


def puller(**kwargs):
ti = kwargs['ti']

v1 = ti.xcom_pull(key=None, task_ids='push')
assert v1 == value_1

v1 = ti.xcom_pull(key=None, task_ids=['push'])
assert (v1) == (value_1)


push1 = PythonOperator(
task_id='push', dag=dag, python_callable=push)

pull = BashOperator(
task_id='also_run_this',
bash_command='echo {{ ti.xcom_pull(task_ids="push_by_returning") }}',
dag=dag)

pull.set_upstream(push1)

但是在 Airflow 中运行 DAG 时,我们遇到了以下异常。
[2018-09-27 16:55:33,431] {base_task_runner.py:98} INFO - Subtask: [2018-09-27 16:55:33,430] {models.py:189} INFO - Filling up the DagBag from /home/airflow/gcs/dags/xcom.py
[2018-09-27 16:55:33,694] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-09-27 16:55:33,694] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module>
[2018-09-27 16:55:33,696] {base_task_runner.py:98} INFO - Subtask: args.func(args)
[2018-09-27 16:55:33,697] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-09-27 16:55:33,697] {base_task_runner.py:98} INFO - Subtask: pool=args.pool,
[2018-09-27 16:55:33,698] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-09-27 16:55:33,699] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs)
[2018-09-27 16:55:33,699] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
[2018-09-27 16:55:33,701] {base_task_runner.py:98} INFO - Subtask: result = task_copy.execute(context=context)
[2018-09-27 16:55:33,701] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 89, in execute
[2018-09-27 16:55:33,702] {base_task_runner.py:98} INFO - Subtask: return_value = self.execute_callable()
[2018-09-27 16:55:33,703] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 94, in execute_callable
[2018-09-27 16:55:33,703] {base_task_runner.py:98} INFO - Subtask: return self.python_callable(*self.op_args, **self.op_kwargs)
[2018-09-27 16:55:33,704] {base_task_runner.py:98} INFO - Subtask: File "/home/airflow/gcs/dags/xcom.py", line 22, in push
[2018-09-27 16:55:33,707] {base_task_runner.py:98} INFO - Subtask: kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)
[2018-09-27 16:55:33,708] {base_task_runner.py:98} INFO - Subtask: KeyError: 'ti'

我们验证了 DAG 存在但没有问题,请帮助我们解决此问题。

最佳答案

添加 provide_context: True默认参数。这需要define **kwargs .

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True
}

provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates. For this to work, you need to define **kwargs in your function header.

关于airflow - key 错误 : 'ti' in Apache Airflow xcom,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52541911/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com