gpt4 book ai didi

python - 如何将 PostgreSQL 查询结果传递给 Airflow 中的变量? (Postgres Operator 或 Postgres Hook)

转载 作者:行者123 更新时间:2023-12-05 03:38:37 25 4
gpt4 key购买 nike

我计划使用 PostgreSQL 作为我的任务元信息提供者,所以我想运行一些查询并获取一些数据并将其像填充变量一样传递给另一个任务。问题是当我使用 PostgresHook 时,我得到了数据,但它是在我无法访问的 python 方法中,实际上我看到了下面的行

[2021-08-23 13:00:12,628] {python.py:151} INFO - Done. Returned value was: [[1, "inf_account",....]]

这是我的部分代码:

def _query_postgres(**context):
"""
Queries Postgres and returns a cursor to the results.
"""

postgres = PostgresHook(postgres_conn_id="aramis_postgres_connection")
conn = postgres.get_conn()
cursor = conn.cursor()
mark_williams = cursor.execute(" SELECT * FROM public.aramis_meta_task; ")

# iterate over to get a list of dicts
details_dicts = [doc for doc in cursor]

# serialize to json string
details_json_string = json.dumps(details_dicts, default=json_util.default)

task_instance = context['task_instance']
task_instance.xcom_push(key="my_value", value=details_json_string)
return details_json_string

但我不知道应该使用哪个变量来访问它或如何将它推送到 XCOM,以便我可以将该返回值用作另一个 bashoperator 任务(例如 Spark)的参数。

PostgresOperator 另一方面,结果只返回 None

最佳答案

背后的诡计XComs是你在一个任务中push它们,在另一个任务中pull它们。如果你想在 bash 运算符中使用你在 _query_postgres 函数中推送的 XCom,你可以使用这样的东西:

puller = BashOperator(
task_id="do_something_postgres_result",
bash_command="some-bash-command {{ task_instance.xcom_pull(key='my_value', task_ids='query_postgres_task_id_here') }}",
dag=dag)

您需要将 bash_command 替换为适当的命令,并更改 xcom_pull() 中的 task_ids 以设置 task_id 来自您创建的调用 _query_postgres 函数的任务。

关于PostgresOperator,返回None就可以了。它不是用于数据提取(即使您运行 SELECT 查询。您使用 PostgresHook 实现它的方式是可以的。

了解 XComs 的一些很好的资源:

  1. https://medium.com/analytics-vidhya/airflow-tricks-xcom-and-subdag-361ff5cd46ff
  2. https://precocityllc.com/blog/airflow-and-xcom-inter-task-communication-use-cases/
  3. https://github.com/apache/airflow/blob/main/airflow/example_dags/example_xcom.py

关于python - 如何将 PostgreSQL 查询结果传递给 Airflow 中的变量? (Postgres Operator 或 Postgres Hook),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68889828/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com