作者热门文章
- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我想使用 sqlalchemy 核心使用 postgresql 9.5 添加的"new"功能进行更新插入。在实现时,我对语法感到很困惑,无法适应我的需要。这是我希望能够执行的操作的示例代码:
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'test'
a_id = Column('id',Integer, primary_key=True)
a = Column("a",Integer)
engine = create_engine('postgres://name:password@localhost/test')
User().metadata.create_all(engine)
meta = MetaData(engine)
meta.reflect()
table = Table('test', meta, autoload=True)
conn = engine.connect()
from sqlalchemy.dialects.postgresql import insert as psql_insert
stmt = psql_insert(table).values({
table.c['id']: bindparam('id'),
table.c['a']: bindparam('a'),
})
stmt = stmt.on_conflict_do_update(
index_elements=[table.c['id']],
set_={'a': bindparam('a')},
)
list_of_dictionary = [{'id':1, 'a':1, }, {'id':2, 'a':2,}]
conn.execute(stmt, list_of_dictionary)
我基本上想插入大量行,如果一个 id 已经被占用,我想用我最初想插入的值更新它。但是 sqlalchemy 抛出这个错误:
CompileError: bindparam() name 'a' is reserved for automatic usage in the VALUES or SET clause of this insert/update statement. Please use a name other than column name when using bindparam() with insert() or update() (for example, 'b_a').
虽然这是一个已知问题(请参阅 https://groups.google.com/forum/#!topic/sqlalchemy/VwiUlF1cz_o),但我没有找到任何不需要修改 list_of_dictionary 的键或列名的正确答案。
我想知道是否有一种方法可以构造 stmt 以具有一致的行为,这种行为不依赖于变量 list_of_dictionary 的键是否是插入表的列名(我的代码没有在这些情况下会出错)。
最佳答案
这对我有用:
from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table
from sqlalchemy.dialects import postgresql
from sqlalchemy.inspection import inspect
def upsert(engine, schema, table_name, records=[]):
metadata = MetaData(schema=schema)
metadata.bind = engine
table = Table(table_name, metadata, schema=schema, autoload=True)
# get list of fields making up primary key
primary_keys = [key.name for key in inspect(table).primary_key]
# assemble base statement
stmt = postgresql.insert(table).values(records)
# define dict of non-primary keys for updating
update_dict = {
c.name: c
for c in stmt.excluded
if not c.primary_key
}
# cover case when all columns in table comprise a primary key
# in which case, upsert is identical to 'on conflict do nothing.
if update_dict == {}:
warnings.warn('no updateable columns found for table')
# we still wanna insert without errors
insert_ignore(table_name, records)
return None
# assemble new statement with 'on conflict do update' clause
update_stmt = stmt.on_conflict_do_update(
index_elements=primary_keys,
set_=update_dict,
)
# execute
with engine.connect() as conn:
result = conn.execute(update_stmt)
return result
关于python - 如何在 postgresql 上使用 sqlalchemy 进行正确的更新插入?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41724658/
我是一名优秀的程序员,十分优秀!