- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试将 SQLAlchemy 表达式与 dask 的 read_sql_table 结合使用,以降低通过连接和过滤几个不同的表创建的数据集。 documentation表明这应该是可能的。
(下面的示例不包括任何连接,因为它们不需要复制问题。)
我构建我的连接字符串,创建一个 SQLAlchemy 引擎和对应于我的数据库中的一个表的表。 (我正在使用 PostgreSQL。)
import dask.dataframe as dd
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy import Column, MetaData, Table
from sqlalchemy.sql import select
username = 'username'
password = 'password'
server = 'prod'
database = 'my_db'
connection_string = f'postgresql+psycopg2://{username}:{password}@{server}/{database}'
engine = create_engine(connection_string)
metadata = MetaData()
t = Table('my_table', metadata,
Column('id'),
schema='my_schema')
我能够构建一个选择并将其与 SQLAlchemy 一起使用,没有任何问题
>>> s = select([t]).limit(5)
>>> rp = engine.execute(s)
>>> rp.fetchall()
[(3140757,), (3118225,), (3156070,), (3193075,), (3114614,)]
我还可以将 SQLAlchey select 提供给 panda 的 read_sql,效果很好
>>> pd.read_sql(s, connection_string)
id
0 3140757
1 3118225
2 3156070
3 3193075
4 3114614
但是,当我将相同的选择传递给 dask 时,我得到了一个 ProgrammingError。它表明 dask 正在调头并调用 pandas.read_sql,所以你会认为它应该可以工作,但显然有些事情不是。
>>> dd.read_sql_table(s, connection_string, index_col='id')
---------------------------------------------------------------------------
ProgrammingError Traceback (most recent call last)
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
1192 parameters,
-> 1193 context)
1194 except BaseException as e:
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\default.py in do_execute(self, cursor, statement, parameters, context)
508 def do_execute(self, cursor, statement, parameters, context=None):
--> 509 cursor.execute(statement, parameters)
510
ProgrammingError: subquery in FROM must have an alias
LINE 2: FROM (SELECT my_schema.my_table.id AS id
^
HINT: For example, FROM (SELECT ...) [AS] foo.
The above exception was the direct cause of the following exception:
ProgrammingError Traceback (most recent call last)
<ipython-input-5-0db95e60f442> in <module>
----> 1 dd.read_sql_table(s, connection_string, index_col='id')
C:\miniconda3\envs\my_env\lib\site-packages\dask\dataframe\io\sql.py in read_sql_table(table, uri, index_col, divisions, npartitions, limits, columns, bytes_per_chunk, head_rows, schema, meta, engine_kwargs, **kwargs)
116 # derrive metadata from first few rows
117 q = sql.select(columns).limit(head_rows).select_from(table)
--> 118 head = pd.read_sql(q, engine, **kwargs)
119
120 if head.empty:
C:\miniconda3\envs\my_env\lib\site-packages\pandas\io\sql.py in read_sql(sql, con, index_col, coerce_float, params, parse_dates, columns, chunksize)
395 sql, index_col=index_col, params=params,
396 coerce_float=coerce_float, parse_dates=parse_dates,
--> 397 chunksize=chunksize)
398
399
C:\miniconda3\envs\my_env\lib\site-packages\pandas\io\sql.py in read_query(self, sql, index_col, coerce_float, parse_dates, params, chunksize)
1061 args = _convert_params(sql, params)
1062
-> 1063 result = self.execute(*args)
1064 columns = result.keys()
1065
C:\miniconda3\envs\my_env\lib\site-packages\pandas\io\sql.py in execute(self, *args, **kwargs)
952 def execute(self, *args, **kwargs):
953 """Simple passthrough to SQLAlchemy connectable"""
--> 954 return self.connectable.execute(*args, **kwargs)
955
956 def read_table(self, table_name, index_col=None, coerce_float=True,
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in execute(self, statement, *multiparams, **params)
2073
2074 connection = self.contextual_connect(close_with_result=True)
-> 2075 return connection.execute(statement, *multiparams, **params)
2076
2077 def scalar(self, statement, *multiparams, **params):
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in execute(self, object, *multiparams, **params)
946 raise exc.ObjectNotExecutableError(object)
947 else:
--> 948 return meth(self, multiparams, params)
949
950 def _execute_function(self, func, multiparams, params):
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\sql\elements.py in _execute_on_connection(self, connection, multiparams, params)
267 def _execute_on_connection(self, connection, multiparams, params):
268 if self.supports_execution:
--> 269 return connection._execute_clauseelement(self, multiparams, params)
270 else:
271 raise exc.ObjectNotExecutableError(self)
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _execute_clauseelement(self, elem, multiparams, params)
1058 compiled_sql,
1059 distilled_params,
-> 1060 compiled_sql, distilled_params
1061 )
1062 if self._has_events or self.engine._has_events:
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
1198 parameters,
1199 cursor,
-> 1200 context)
1201
1202 if self._has_events or self.engine._has_events:
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
1411 util.raise_from_cause(
1412 sqlalchemy_exception,
-> 1413 exc_info
1414 )
1415 else:
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\util\compat.py in raise_from_cause(exception, exc_info)
263 exc_type, exc_value, exc_tb = exc_info
264 cause = exc_value if exc_value is not exception else None
--> 265 reraise(type(exception), exception, tb=exc_tb, cause=cause)
266
267 if py3k:
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\util\compat.py in reraise(tp, value, tb, cause)
246 value.__cause__ = cause
247 if value.__traceback__ is not tb:
--> 248 raise value.with_traceback(tb)
249 raise value
250
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
1191 statement,
1192 parameters,
-> 1193 context)
1194 except BaseException as e:
1195 self._handle_dbapi_exception(
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\default.py in do_execute(self, cursor, statement, parameters, context)
507
508 def do_execute(self, cursor, statement, parameters, context=None):
--> 509 cursor.execute(statement, parameters)
510
511 def do_execute_no_params(self, cursor, statement, context=None):
ProgrammingError: (psycopg2.ProgrammingError) subquery in FROM must have an alias
LINE 2: FROM (SELECT my_schema.my_table.id AS id
^
HINT: For example, FROM (SELECT ...) [AS] foo.
[SQL: 'SELECT id \nFROM (SELECT my_schema.my_table.id AS id \nFROM my_schema.my_table \n LIMIT %(param_1)s) \n LIMIT %(param_2)s'] [parameters: {'param_1': 5, 'param_2': 5}] (Background on this error at: http://sqlalche.me/e/f405)
最佳答案
正如 Chris 在另一个答案中所说的那样,Dask 将您的查询包装成某种形式 SELECT columns FROM (yourquery)
,这对于 PostgreSQL 来说是无效的语法,因为它需要一个带括号的别名表达。无需重新实现整个 read_sql_table
方法,只需将 .alias('somename')
添加到您的选择即可为表达式添加别名,即
select([t]).limit(5).alias('foo')
当被 Dask 包装时,该表达式会为 Postgres 生成正确的语法
SELECT columns FROM (yourquery) AS foo
关于python - 使用 SQLAlchemy 表达式时出现 Dask read_sql_table 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54819271/
我正在尝试创建一个使用 UUID 作为主键的用户模型: from src.db import db # SQLAlchemy instance import sqlalchemy_utils impo
在 sqlalchemy 中,我试图合并表,然后使用 WHERE 和 ORDER_BY 创建别名 有点像 SELECT * FROM ( SELECT [TABLE_ONE].[SOME_ID]
我正在使用 SQL Alchemy(通过 Flask_sqlalchemy)将 Python 字典列表插入到 Postgres 数据库中。 其中一个表是所有唯一项目的列表(表 1),而第二个是与某个项
This source详细说明如何使用关联代理创建具有 ORM 对象值的 View 和对象。 但是,当我附加一个与数据库中现有对象匹配的值(并且所述值是唯一的或主键)时,它会创建一个冲突的对象,因此我
SQLAlchemy Core和SQLAlchemy ORM的目的有什么区别? 最佳答案 顾名思义,ORM是一个对象关系映射器:其目的是将数据库关系表示为Python对象。 核心是查询构建器。其目的是
带有ForeignKey的Column是否自动创建索引? 还是我需要手动添加index=True? some_field = Column(Integer, ForeignKey(SomeModel.
我有一个主数据库,每个客户自己的数据库连接存储在其中。 因此,每个客户端都使用2个db:main和它自己的db,必须确定其连接 对于每个http调用。我如何使用flask-sqlalchemy扩展名执
当我仅对类进行继承时,它才起作用 class User(Base): __tablename__ = ’users’ id = Column(Integer, primary_key=
从用户的角度来看,SQLAlchemy 的查询日志似乎有点过于冗长,有时甚至有点神秘: 2015-10-02 13:51:39,500 INFO sqlalchemy.engine.base.Engi
我正在尝试使用 wtforms.ext.sqlalchemy QuerySelectMultipleField 显示复选框列表,但我无法在 GET 的表单上显示模型数据。 这是我的models.py
我想为查询返回一个中继连接。使用标准的 graphene-sqlalchemy 你可以这样做: class Query(graphene.ObjectType): node = relay.N
我在 centos 7.5 虚拟机上部署了最新的 Airflow ,并将 sql_alchemy_conn 和 result_backend 更新到 postgresql 实例上的 postgres
我想将多个项目插入到一个表中,并在发生冲突时更新该表。这是我想出的以下内容 from sqlalchemy.dialects.postgresql import insert meta = MetaD
我有以下模型: class Item(Base): a = relationship() b = relationship() c = relationship() d
我有 presto 和 superset 设置。 presto 运行良好,可以通过命令访问: ./app/hadoop/setjdk8.sh;bin/presto-cli --server http:
我一直在寻找一种在 sqlalchemy 中使用 tsvector 的方法(就像 INTEGER 等其他方法一样),但到目前为止我还不清楚如何做到这一点。我读过可以使用 UserDefinedType
我正在使用 sqlalchemy(现在使用 sqlite,但稍后可能会改变)来构建一个数据库,其中插入的顺序和 rowids 很重要。我基本上有以下几点: class Message(Base):
给定一个对象,我想知道如何知道它是否是 sqlalchemy 映射模型的实例。 通常,我会使用 isinstance(obj, DeclarativeBase)。但是,在这种情况下,我没有可用的 De
我已经通读了查询文档,如果有办法从查询中获取表名,就看不到任何地方 - 例如如果我有 q = query(Users) ,我可以得到Users从 q 退出? 最佳答案 请注意,像您这样的事件简单查询可
我不确定如何定义create schema foo迁移?我的模型如下所示(我正在使用Flask-Migrate): class MyTable(db.Model): __tablename__
我是一名优秀的程序员,十分优秀!