gpt4 book ai didi

python - 没有xcom的任务之间的 Airflow 通信

转载 作者:行者123 更新时间:2023-12-05 05:05:53 30 4
gpt4 key购买 nike

我发现 xcom 实际上是将数据写入数据库并从其他任务中提取数据。我的数据集很大,将其腌制并写入数据库会导致一些不必要的延迟。有没有办法在不使用 xcom 的情况下在同一 Airflow Dag 中的任务之间传递数据?

下面是我试过的代码,实际上没有传递上下文。我知道使用 task_instance.xcom_push() 会起作用,但它也会腌制数据并将其写入我不需要的数据库。

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from pandas import DataFrame
import pandas as pd
from custom.dataframe_to_postgres_operator import PostgresOperatorBulk
from airflow.operators.postgres_operator import PostgresOperator

def read_df(task_instance, **context):
df = pd.read_parquet('/usr/local/airflow/data/df.parquet.gzip')
print(df)
# task_instance.xcom_push('data', df)
context.update({'data': df})
for k, v in context.items():
print(k, v)
return 1

def get_df(task_instance, **context):
for k, v in context.items():
print(k, v)
df = context['data']

default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime(2020, 2, 17),
'retries': 0,
}

dag = DAG('abcdefg', default_args=default_args, schedule_interval=timedelta(days=1))

task_read_df = PythonOperator(
task_id='read_df',
python_callable=read_df,
dag=dag,
provide_context=True,
do_xcom_push=False
)

task_get_df = PythonOperator(
task_id='get_df',
python_callable=get_df,
dag=dag,
provide_context=True,
do_xcom_push=False
)

task_read_df >> task_get_df

最佳答案

如果您有一个大型数据集需要交换,我建议将数据存储在某种形式的临时位置(例如指定目录),然后使用 XCOM< 将路径传递给此类临时文件或文件(这对于小数据 block 来说很便宜并且提供了足够好的性能)。

为此,一个好的图书馆是 tempfile这有助于减轻避免临时文件重复的痛苦。

为什么 XCOM 而不是在任务之间共享执行上下文

鉴于任务可以并行执行,这是给 Python(GIL,并行共享数据)带来很多困难的第一个问题。

其次,为了确保某种形式的持久性(以及因此对故障的恢复能力),您必须使用数据库来确保 ACID .

这一切都让 XCOM 机制变得相对沉重(尤其是当您在其之上添加 pickling 时)但它是通用的。

考虑到所有这些,您必须记住,使用通过 XCOM 传递路径的临时文件确实确保了与 XCOM 本身相同的弹性水平(特别是如果文件存储在 RAM 磁盘上)。它还不支持重播任务,除非您无限期保留临时文件。

关于python - 没有xcom的任务之间的 Airflow 通信,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60282738/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com