gpt4 book ai didi

python - 如何将 PySpark 数据框插入到具有雪花模式的数据库中?

转载 作者:行者123 更新时间:2023-12-04 15:25:42 24 4
gpt4 key购买 nike

我正在使用 PySpark 计算一个数据框,如果这个数据库有一个 snowflake schema,我该如何将这个数据框附加到我的数据库中? ?

如何指定拆分数据框的方式,以便将类似 CSV 的数据放入多个联合表中?

我的问题并不特定于 Pyspark,同样的问题也可以问到 pandas。

最佳答案

将从 CSV 中提取的数据帧附加到由雪花模式组成的数据库:

  1. 从雪花模式中提取数据。
  2. 从外部数据源提取新数据。
  3. 合并两个数据集。
  4. 将组合转换为一组维度表和事实表以匹配雪花模式。
  5. 将转换后的数据帧加载到数据库中,覆盖现有数据。

例如对于具有以下架构的数据框,从外部源中提取:

StructType([StructField('customer_name', StringType()),
StructField('campaign_name', StringType())])
def entrypoint(spark: SparkSession) -> None:
extracted_customer_campaigns = extract_from_external_source(spark)

existing_customers_dim, existing_campaigns_dim, existing_facts = (
extract_from_snowflake(spark))

combined_customer_campaigns = combine(existing_campaigns_dim,
existing_customers_dim,
existing_facts,
extracted_customer_campaigns)

new_campaigns_dim, new_customers_dim, new_facts = transform_to_snowflake(
combined_customer_campaigns)

load_snowflake(new_campaigns_dim, new_customers_dim, new_facts)


def combine(campaigns_dimension: DataFrame,
customers_dimension: DataFrame,
facts: DataFrame,
extracted_customer_campaigns: DataFrame) -> DataFrame:
existing_customer_campaigns = facts.join(
customers_dimension,
on=['customer_id']).join(
campaigns_dimension, on=['campaign_id']).select('customer_name',
'campaign_name')

combined_customer_campaigns = extracted_customer_campaigns.union(
existing_customer_campaigns).distinct()

return combined_customer_campaigns


def transform_to_snowflake(customer_campaigns: DataFrame) -> (
DataFrame, DataFrame):
customers_dim = customer_campaigns.select(
'customer_name').distinct().withColumn(
'customer_id', monotonically_increasing_id())

campaigns_dim = customer_campaigns.select(
'campaign_name').distinct().withColumn(
'campaign_id', monotonically_increasing_id())

facts = (
customer_campaigns.join(customers_dim,
on=['customer_name']).join(
campaigns_dim, on=[
'campaign_name']).select('customer_id', 'campaign_id'))

return campaigns_dim, customers_dim, facts

这是一种简单的函数式方法。可以通过编写增量来优化,而不是为每个 ETL 批处理重新生成雪花键。

此外,如果提供了一个单独的外部 CSV 包含要删除的记录,则可以类似地提取它,然后在转换之前从组合数据框中减去,以删除那些现有记录。

最后,问题仅涉及附加到表格。如果需要合并/更新,则需要手动添加其他步骤 Spark itself does not support it .

关于python - 如何将 PySpark 数据框插入到具有雪花模式的数据库中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62303471/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com