gpt4 book ai didi

apache-spark - 如何向 Delta Lake 表添加新列?

转载 作者:行者123 更新时间:2023-12-04 00:54:36 24 4
gpt4 key购买 nike

我正在尝试向在 Azure Blob 存储中存储为增量表的数据添加一个新列。对数据执行的大多数操作都是 upserts,有很多更新和很少的新插入。我写数据的代码目前看起来像这样:

DeltaTable.forPath(spark, deltaPath)
.as("dest_table")
.merge(myDF.as("source_table"),
"dest_table.id = source_table.id")
.whenNotMatched()
.insertAll()
.whenMatched(upsertCond)
.updateExpr(upsertStat)
.execute()
来自 these docs ,看起来 Delta Lake 支持在 insertAll() 上添加新列和 updateAll()只打电话。但是,我仅在满足某些条件并希望将新列添加到所有现有数据时才进行更新(默认值为 null )。
我想出了一个看起来非常笨拙的解决方案,我想知道是否有更优雅的方法。这是我目前提出的解决方案:
// Read in existing data
val myData = spark.read.format("delta").load(deltaPath)
// Register table with Hive metastore
myData.write.format("delta").saveAsTable("input_data")

// Add new column
spark.sql("ALTER TABLE input_data ADD COLUMNS (new_col string)")

// Save as DataFrame and overwrite data on disk
val sqlDF = spark.sql("SELECT * FROM input_data")
sqlDF.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(deltaPath)

最佳答案

首先更改增量表,然后执行合并操作:

from pyspark.sql.functions import lit

spark.read.format("delta").load('/mnt/delta/cov')\
.withColumn("Recovered", lit(''))\
.write\
.format("delta")\
.mode("overwrite")\
.option("overwriteSchema", "true")\
.save('/mnt/delta/cov')

关于apache-spark - 如何向 Delta Lake 表添加新列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63528754/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com