gpt4 book ai didi

python - 在 python 雪花连接器中使用合并,以 pandas 数据框作为源

转载 作者:行者123 更新时间:2023-12-05 02:31:24 25 4
gpt4 key购买 nike

我正在从 API 检索数据并将数据转换为 Pandas 数据框。我正在使用 python-snowflake 连接器将此数据作为表发送到我的雪花模式中。

我想使用合并而不是将重复数据发送到我的雪花表中。

我从 API 检索的示例数据:

|------------|-------------|------------|
| log_in_ID | user_id | date |
|------------|-------------|------------|
| 1 | 21 | 02/21/2021 |
| 2 | 22 | 02/24/2021 |
| 3 | 23 | 02/27/2021 |
| 4 | 21 | 02/29/2021 |
|------------|-------------|------------|

log_in_ID 是唯一的

这是我的代码:

import requests
import json
import snowflake.connector
import pandas as pd
from sqlalchemy import create_engine

engine = create_engine(URL(
account='my_snowflake_account',
user='user',
password='password',
database='my_database'
schema='my_schema',
warehouse='warehouse',
role='ADMIN'))

pandas_df = 'Some code to get API data and convert into pandas dataframe'

def send_to_snowflake(pandas_df, target_table):
connect = engine.connect()
data.tosql(target_table, con=engine, index=False, if_exists='append')
connection.close()
engine.dispose()

if __name__ == "__main__":
send_to_snowflake(pandas_df, target_table)

如何使用合并语句将 log_in_id 作为唯一键?

如何在 snowflake-python 的合并查询中使用 pandas 数据框?

merge into target_table using {pandas_dataframe}
on target_table.log_in_id = {pandas_dataframe}.log_in_id
when matched then
update set target_table.user_id = {pandas_dataframe}.user_id and
set target_table.date = {pandas_dataframe}.date

最佳答案

如果您的 API 结构类似于以下格式:[(1, 21, 'A'), (2, 22, 'AA'), (3, 23, 'AB'), (4, 21, 'AC')]

此代码将用于将 API 数据合并到雪花目标表中,而无需将源数据加载到表中:

import requests
import json
import snowflake.connector
import pandas as pd
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL


def sample_func():
engine = create_engine(URL(
account='xxx',
user='xxx',
password='xxx',
database='xxx',
schema='PUBLIC',
warehouse='COMPUTE_WH',
role='xxx',
))

connection = engine.connect()
pandas_df = 'select * from A'

try:
cursor_return = connection.execute(pandas_df)
cursor_result = cursor_return.fetchall()
api_data = str(cursor_result)[1:-1]
print(api_data)
merge_temp = """
merge into B target_table using (select COLUMN1,COLUMN2,COLUMN3 from values{0}) src
on target_table.log_in_id = src.COLUMN1
when matched then
update set target_table.log_in_id = src.COLUMN1,
target_table.user_id = src.COLUMN2,
target_table.test_data = src.COLUMN3
when not matched then
insert
(log_in_id, user_id, test_data) values(src.COLUMN1, src.COLUMN2, src.COLUMN3)
""".format(str(api_data))
print(merge_temp)
c_return = connection.execute(merge_temp)
c_result = c_return.fetchall()
print(c_result)

print("Number rows inserted: {0} || Number of rows updated: {1}".format(str(c_result[0][0]), str(c_result[0][1])))
finally:
connection.close()
engine.dispose()

sample_func()

但我建议将您的 API 数据加载到临时表中并在临时表上使用合并语句,这种方法比从数据帧或 csv 文件加载它更快。

关于python - 在 python 雪花连接器中使用合并,以 pandas 数据框作为源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71549603/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com