gpt4 book ai didi

python - 在 Apache Airflow 中保存算子的结果

转载 作者:行者123 更新时间:2023-12-03 18:21:34 35 4
gpt4 key购买 nike

几个运算符允许提取数据,但我从未设法使用结果。

例如:
https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/bigquery_get_data.py

该运算符可以按如下方式调用:

get_data = BigQueryGetDataOperator(
task_id='get_data_from_bq',
dataset_id='test_dataset',
table_id='Transaction_partitions',
max_results='100',
selected_fields='DATE',
bigquery_conn_id='airflow-service-account'
)

然而,get_data 是 DAG 类型,但第 116 行说“返回 table_data”。
需要明确的是,运算符(operator)工作并检索数据,我只是不明白如何使用数据检索/数据所在的位置。

如何使用上面的“get_data”获取数据?

最佳答案

您将使用的方式 get_data是在下一个任务可以是PythonOperator然后您可以使用它来处理数据。

get_data = BigQueryGetDataOperator(
task_id='get_data_from_bq',
dataset_id='test_dataset',
table_id='Transaction_partitions',
max_results='100',
selected_fields='DATE',
bigquery_conn_id='airflow-service-account'
)

def process_data_from_bq(**kwargs):
ti = kwargs['ti']
bq_data = ti.xcom_pull(task_ids='get_data_from_bq')
# Now bq_data here would have your data in Python list
print(bq_data)

process_data = PythonOperator(
task_id='process_data_from_bq',
python_callable=process_bq_data,
provide_context=True
)

get_data >> process_data

PS:我是 BigQueryGetDataOperator的作者和 Airflow 提交者/PMC

关于python - 在 Apache Airflow 中保存算子的结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53960327/

35 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com