- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试将 SQLAlchemy 表达式与 dask 的 read_sql_table 结合使用,以降低通过连接和过滤几个不同的表创建的数据集。 documentation表明这应该是可能的。
(下面的示例不包括任何连接,因为它们不需要复制问题。)
我构建我的连接字符串,创建一个 SQLAlchemy 引擎和对应于我的数据库中的一个表的表。 (我正在使用 PostgreSQL。)
import dask.dataframe as dd
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy import Column, MetaData, Table
from sqlalchemy.sql import select
username = 'username'
password = 'password'
server = 'prod'
database = 'my_db'
connection_string = f'postgresql+psycopg2://{username}:{password}@{server}/{database}'
engine = create_engine(connection_string)
metadata = MetaData()
t = Table('my_table', metadata,
Column('id'),
schema='my_schema')
我能够构建一个选择并将其与 SQLAlchemy 一起使用,没有任何问题
>>> s = select([t]).limit(5)
>>> rp = engine.execute(s)
>>> rp.fetchall()
[(3140757,), (3118225,), (3156070,), (3193075,), (3114614,)]
我还可以将 SQLAlchey select 提供给 panda 的 read_sql,效果很好
>>> pd.read_sql(s, connection_string)
id
0 3140757
1 3118225
2 3156070
3 3193075
4 3114614
但是,当我将相同的选择传递给 dask 时,我得到了一个 ProgrammingError。它表明 dask 正在调头并调用 pandas.read_sql,所以你会认为它应该可以工作,但显然有些事情不是。
>>> dd.read_sql_table(s, connection_string, index_col='id')
---------------------------------------------------------------------------
ProgrammingError Traceback (most recent call last)
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
1192 parameters,
-> 1193 context)
1194 except BaseException as e:
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\default.py in do_execute(self, cursor, statement, parameters, context)
508 def do_execute(self, cursor, statement, parameters, context=None):
--> 509 cursor.execute(statement, parameters)
510
ProgrammingError: subquery in FROM must have an alias
LINE 2: FROM (SELECT my_schema.my_table.id AS id
^
HINT: For example, FROM (SELECT ...) [AS] foo.
The above exception was the direct cause of the following exception:
ProgrammingError Traceback (most recent call last)
<ipython-input-5-0db95e60f442> in <module>
----> 1 dd.read_sql_table(s, connection_string, index_col='id')
C:\miniconda3\envs\my_env\lib\site-packages\dask\dataframe\io\sql.py in read_sql_table(table, uri, index_col, divisions, npartitions, limits, columns, bytes_per_chunk, head_rows, schema, meta, engine_kwargs, **kwargs)
116 # derrive metadata from first few rows
117 q = sql.select(columns).limit(head_rows).select_from(table)
--> 118 head = pd.read_sql(q, engine, **kwargs)
119
120 if head.empty:
C:\miniconda3\envs\my_env\lib\site-packages\pandas\io\sql.py in read_sql(sql, con, index_col, coerce_float, params, parse_dates, columns, chunksize)
395 sql, index_col=index_col, params=params,
396 coerce_float=coerce_float, parse_dates=parse_dates,
--> 397 chunksize=chunksize)
398
399
C:\miniconda3\envs\my_env\lib\site-packages\pandas\io\sql.py in read_query(self, sql, index_col, coerce_float, parse_dates, params, chunksize)
1061 args = _convert_params(sql, params)
1062
-> 1063 result = self.execute(*args)
1064 columns = result.keys()
1065
C:\miniconda3\envs\my_env\lib\site-packages\pandas\io\sql.py in execute(self, *args, **kwargs)
952 def execute(self, *args, **kwargs):
953 """Simple passthrough to SQLAlchemy connectable"""
--> 954 return self.connectable.execute(*args, **kwargs)
955
956 def read_table(self, table_name, index_col=None, coerce_float=True,
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in execute(self, statement, *multiparams, **params)
2073
2074 connection = self.contextual_connect(close_with_result=True)
-> 2075 return connection.execute(statement, *multiparams, **params)
2076
2077 def scalar(self, statement, *multiparams, **params):
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in execute(self, object, *multiparams, **params)
946 raise exc.ObjectNotExecutableError(object)
947 else:
--> 948 return meth(self, multiparams, params)
949
950 def _execute_function(self, func, multiparams, params):
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\sql\elements.py in _execute_on_connection(self, connection, multiparams, params)
267 def _execute_on_connection(self, connection, multiparams, params):
268 if self.supports_execution:
--> 269 return connection._execute_clauseelement(self, multiparams, params)
270 else:
271 raise exc.ObjectNotExecutableError(self)
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _execute_clauseelement(self, elem, multiparams, params)
1058 compiled_sql,
1059 distilled_params,
-> 1060 compiled_sql, distilled_params
1061 )
1062 if self._has_events or self.engine._has_events:
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
1198 parameters,
1199 cursor,
-> 1200 context)
1201
1202 if self._has_events or self.engine._has_events:
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
1411 util.raise_from_cause(
1412 sqlalchemy_exception,
-> 1413 exc_info
1414 )
1415 else:
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\util\compat.py in raise_from_cause(exception, exc_info)
263 exc_type, exc_value, exc_tb = exc_info
264 cause = exc_value if exc_value is not exception else None
--> 265 reraise(type(exception), exception, tb=exc_tb, cause=cause)
266
267 if py3k:
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\util\compat.py in reraise(tp, value, tb, cause)
246 value.__cause__ = cause
247 if value.__traceback__ is not tb:
--> 248 raise value.with_traceback(tb)
249 raise value
250
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
1191 statement,
1192 parameters,
-> 1193 context)
1194 except BaseException as e:
1195 self._handle_dbapi_exception(
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\default.py in do_execute(self, cursor, statement, parameters, context)
507
508 def do_execute(self, cursor, statement, parameters, context=None):
--> 509 cursor.execute(statement, parameters)
510
511 def do_execute_no_params(self, cursor, statement, context=None):
ProgrammingError: (psycopg2.ProgrammingError) subquery in FROM must have an alias
LINE 2: FROM (SELECT my_schema.my_table.id AS id
^
HINT: For example, FROM (SELECT ...) [AS] foo.
[SQL: 'SELECT id \nFROM (SELECT my_schema.my_table.id AS id \nFROM my_schema.my_table \n LIMIT %(param_1)s) \n LIMIT %(param_2)s'] [parameters: {'param_1': 5, 'param_2': 5}] (Background on this error at: http://sqlalche.me/e/f405)
最佳答案
正如 Chris 在另一个答案中所说的那样,Dask 将您的查询包装成某种形式 SELECT columns FROM (yourquery)
,这对于 PostgreSQL 来说是无效的语法,因为它需要一个带括号的别名表达。无需重新实现整个 read_sql_table
方法,只需将 .alias('somename')
添加到您的选择即可为表达式添加别名,即
select([t]).limit(5).alias('foo')
当被 Dask 包装时,该表达式会为 Postgres 生成正确的语法
SELECT columns FROM (yourquery) AS foo
关于python - 使用 SQLAlchemy 表达式时出现 Dask read_sql_table 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54819271/
我正在尝试通过 python 访问我的 Oracle 11g(r2) Express Edition 数据库。我特别想从其中一个表 HISTORY_FULLNESS 创建一个 Pandas DataF
我正在尝试使用 SQL 和 Pandas。遵循 sql queries 的 pandas 指南以及相关的sqlalchemy engine specification guide from sqlal
我正在尝试将我的 Postgres 数据库中的表格读入 Python。表大约有 800 万行和 17 列,在数据库中的大小为 622MB。 我可以使用 psql 将整个表导出到 csv,然后使用 pd
我有一个使用 Pandas 的 read_sql,它工作正常。但是,当我尝试使用相同的逻辑在 Dask 下重新创建相同的数据框时。它给了我 NoSuchTableError。我确定该表存在于我的 SQ
尝试以下代码 alerts = df.read_sql_table('alerts', db_url, index_col='id', npartitions=16) 我收到以下错误: TypeErr
我使用了 MariaDB 服务器,并尝试连接并将数据放入 python 中的 pandas 数据帧中。 MariaDB 如下所示: CREATE DATABASE `fhem` DEFAULT CHA
我正在尝试将 SQLAlchemy 表达式与 dask 的 read_sql_table 结合使用,以降低通过连接和过滤几个不同的表创建的数据集。 documentation表明这应该是可能的。 (下
我正在尝试将 SQLAlchemy 表达式与 dask 的 read_sql_table 结合使用,以降低通过连接和过滤几个不同的表创建的数据集。 documentation表明这应该是可能的。 (下
我正在尝试从 postgres 中的表创建 dask 数据框。我想将 application_name = 'myapp' 作为标准传递用于监视和跟踪数据库事件。 但是当我尝试添加参数时,出现以下错误
我正在尝试使用 dask 中的 read_sql_table,但我遇到了一些与 index_col 参数相关的问题。我的 sql 表没有任何数值,我不知道要给 index_col 参数什么。 我在文档
我正在尝试使用以下代码从 PostgreSQL 表获取 DataFrame: import pandas from sqlalchemy.engine import create_engine eng
我正在使用df = dd.read_sql_table('mytable_name', 'connection_string',npartitions=10, index_col='id')创建一个
我是一名优秀的程序员,十分优秀!