gpt4 book ai didi

sql - Airflow + Pandas read_sql_query() 与提交

转载 作者:行者123 更新时间:2023-12-03 19:31:51 25 4
gpt4 key购买 nike

问题

我可以使用 read_sql() 将 SQL 事务提交到数据库吗?

用例和背景

我有一个用例,我希望允许用户执行一些预定义的 SQL 并返回一个 Pandas 数据帧。在某些情况下,此 SQL 将需要查询预先填充的表,而在其他情况下,此 SQL 将执行一个函数,该函数将写入一个表,然后将查询该表。
此逻辑当前包含在 Airflow DAG 的方法中,以便利用使用 PostgresHook 的 Airflow 可访问的数据库连接信息 - 该方法最终在 PythonOperator 任务中调用。我通过测试了解到 PostgresHook 创建了一个 psycopg2 连接对象。

代码

from airflow.hooks.postgres_hook import PostgresHook
import pandas as pd

def create_df(job_id,other_unrelated_inputs):
conn = job_type_to_connection(job_type) # method that helps choose a database
sql = open('/sql_files/job_id_{}.sql'.format(job_id)) #chooses arbitrary SQL
sql_template = sql.read()
hook = PostgresHook(postgres_conn_id=conn) #connection information for alias is predefined elsewhere within Airflow


try:
hook_conn_obj = hook.get_conn()
print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
# Runs SQL template with variables, but does not commit. Alternatively, have used hook.get_pandas_df(sql_template)
df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj)
except:
#catches some errors#
return df

问题

目前,当执行 SQL 函数时,此代码生成一个数据帧,但不会提交在 SQL 函数中所做的任何数据库更改。例如,更准确地说,如果 SQL 函数将一行插入到表中,则该事务将不会提交并且该行不会出现在表中。

尝试

我尝试了一些修复,但被卡住了。我最近的努力是更改 read_sql 使用的 psycopg2 连接的自动提交属性,以便自动提交事务。

我承认我一直无法弄清楚连接的属性何时会对 SQL 的执行产生影响。

我认识到另一种方法是复制 PostgresHook.run() 中的一些逻辑。提交然后添加一些代码以将结果推送到数据帧中,但如果可能的话,使用已经创建的方法似乎更简洁,更容易为 future 的支持提供支持。

我能找到的最类似的问题是 this one ,但我对独立于 Airflow 的解决方案感兴趣。

编辑
...
try:
hook_conn_obj = hook.get_conn()
print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
hook_conn_obj.autocommit = True
df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj) # Runs SQL template with variables, but does not commit
except:
#catches some errors#
return df

这似乎有效。如果有人对实现这一目标的更好方法有任何评论或想法,我仍然有兴趣从讨论中学习。

谢谢!

最佳答案

read_sql不会提交,因为正如该方法名称所暗示的那样,目标是读取数据,而不是写入。这是来自 pandas 的不错的设计选择.这很重要,因为它可以防止意外写入并允许有趣的场景,例如运行过程、读取其效果但没有任何内容被持久化。 read_sql的意图是阅读,而不是写作。直接表达意图是黄金标准原则。

表达您的意图的更明确的方式是 execute (提交)明确在 fetchall 之前.但是因为pandas没有提供从 cursor 读取的简单方法对象,您将失去 read_sql 提供的安心感并且必须自己创建 DataFrame。

所以总而言之,您的解决方案很好,通过设置 autocommit=True您表示您的数据库交互将持续进行,因此不应该发生意外。读起来有点奇怪,但如果你把你的名字命名为 sql_template变量类似于 write_then_read_sql或在文档字符串中解释,意图会更清楚。

关于sql - Airflow + Pandas read_sql_query() 与提交,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53752500/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com