gpt4 book ai didi

python - psycopg2 中的 COPY 命令

转载 作者:行者123 更新时间:2023-11-28 16:23:56 27 4
gpt4 key购买 nike

我有一个进程正在从 4 个数据库读取数据,每个数据库有 4 个表。我将该数据整合​​到 1 个 postgres 数据库中,总共有 4 个表。 (原来的4个数据库各有4张相同的表,需要合并)。

我现在使用的方法是使用 pandas。我一次从所有 4 个数据库中读取一个表,将数据连接到一个数据框中,然后使用 to_sql 将其保存在我的 postgres 数据库中。然后我遍历其余数据库,对其他表执行相同的操作。

我的问题是速度。我的一张表每个日期大约有 1 - 200 万行,因此可能需要大约 5,000 - 6,000 秒才能完成将数据写入 postgres。将其写入 .csv 文件然后在 pgadmin 中使用 COPY FROM 会更快。

这是我当前的代码。请注意,有一些函数调用,但它基本上只是指表名。我还完成了一些基本的日志记录,但这不是太必要。我正在为源数据库添加一列,但这是必需的。我从实际上是字符串的字段中剥离 .0,但 pandas 也将它们视为 float ,我用 0 填充空白整数并确保列确实是 int 类型。

def query_database(table, table_name, query_date):
df_list = []
log_list = []
for db in ['NJ', 'NJ2', 'LA', 'NA']:
start_time = time.clock()
query_timestamp = dt.datetime.now(pytz.timezone('UTC')).strftime('%Y-%m-%d %H:%M:%S')
engine_name = '{}{}{}{}'.format(connection_type, server_name, '/', db)
print('Accessing {} from {}'.format((select_database(db)[0][table]), engine_name))
engine = create_engine(engine_name)
df = pd.read_sql_query(query.format(select_database(db)[0][table]), engine, params={query_date})
query_end = time.clock() - start_time
df['source_database'] = db
df['insert_date_utc'] = query_timestamp
df['row_count'] = df.shape[0]
df['column_count'] = df.shape[1]
df['query_time'] = round(query_end, 0)
df['maximum_id'] = df['Id'].max()
df['minimum_id'] = df['Id'].min()
df['source_table'] = table_dict.get(table)
log = df[['insert_date_utc', 'row_date', 'source_database', 'source_table', 'row_count', 'column_count', 'query_time', 'maximum_id', 'minimum_id']].copy()
df.drop(['row_count', 'column_count', 'query_time', 'maximum_id', 'minimum_id', 'source_table'], inplace=True, axis=1)
df_list.append(df)
log_list.append(log)
log = pd.concat(log_list)
log.drop_duplicates(subset=['row_date', 'source_database', 'source_table'], inplace=True, keep='last')
result = pd.concat(df_list)
result.drop_duplicates('Id', inplace=True)
cols = [i.strip() for i in (create_columns(select_database(db)[0][table]))]
result = result[cols]
print('Creating string columns for {}'.format(table_name))
for col in modify_str_cols(select_database(db)[0][table]):
create_string(result, col)
print('Creating integer columns for {}'.format(table_name))
for col in modify_int_cols(select_database(db)[0][table]):
create_int(result, col)
log.to_sql('raw_query_log', cms_dtypes.pg_engine, index=False, if_exists='append', dtype=cms_dtypes.log_dtypes)
print('Inserting {} data into PostgreSQL'.format(table_name))
result.to_sql(create_table(select_database(db)[0][table]), cms_dtypes.pg_engine, index=False, if_exists='append', chunksize=50000, dtype=create_dtypes(select_database(db)[0][table]))

如何在其中插入 COPY TO 和 COPY FROM 以加快速度?我应该只编写 .csv 文件然后循环处理这些文件,还是可以从内存复制到我的 postgres?

最佳答案

psycopg2 提供了一些特定的copy 相关的api。如果要使用 csv,则必须使用 copy_expert(它允许您指定完整的 copy 语句)。

通常当我这样做时,我会使用 copy_expert() 和一个类似文件的对象,它循环访问磁盘上的文件。这似乎相当有效。

这就是说,在您的情况下,我认为 copy_tocopy_from 更匹配,因为它只是 postgres 到 postgres 的传输。注意这些使用 PostgreSQL 的复制输出/输入语法而不是 csv(如果你想使用 csv,你必须使用 copy_expert)

注意在你决定如何做事情之前,你需要注意:

copy_to 复制到类文件对象(例如StringIO)和从类文件对象copy_from/copy_expert 文件。如果你想使用 Pandas 数据框,你将不得不稍微考虑一下,要么创建一个类似文件的对象,要么使用 csv 以及 StringIOcopy_expert 生成内存中的 csv 并加载它。

关于python - psycopg2 中的 COPY 命令,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38123049/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com