gpt4 book ai didi

apache-spark - 三角洲湖 : How does upsert internally work?

转载 作者:行者123 更新时间:2023-12-03 23:49:40 25 4
gpt4 key购买 nike

在我们的数据管道中,我们从数据源中摄取 CDC 事件,并将这些更改以 AVRO 格式写入“增量数据”文件夹。

然后定期运行 Spark 作业,将这些“增量数据”与我们当前版本的“快照表”(ORC 格式)合并,以获取上游快照的最新版本。

在此合并逻辑期间:

1) 我们将“增量数据”加载为 DataFrame df1

2) 将当前的“快照表”加载为 DataFrame df2

3) 合并 df1 和 df2 去重 ids 并采用最新版本的行(使用 update_timestamp 列)

这个逻辑将“增量数据”和当前“快照表”的全部数据加载到 Spark 内存中,这取决于数据库,这可能非常大。

我注意到在 Delta Lake 中,使用以下代码完成了类似的操作:

import io.delta.tables._
import org.apache.spark.sql.functions._

val updatesDF = ... // define the updates DataFrame[date, eventId, data]

DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()

在这里,“updatesDF”可以被认为是我们来自 CDC 源的“增量数据”。

我的问题 :

1) 内部合并/更新插入是如何工作的?它是否将整个“updatedDF”和“/data/events/”加载到 Spark 内存中?

2) 如果不是,它是否应用了类似于 Apache Hudi 的增量更改?

3) 在重复数据删除期间,这个 upsert 逻辑如何知道采用最新版本的记录?因为我没有看到任何指定“更新时间戳”列的设置?

最佳答案

   1) How does merge/upsert internally works? Does it load entire "updatedDF" and 
"/data/events/" into Spark memory?

不,Spark 不需要加载它需要更新到内存中的整个 Delta DF。
否则它将无法扩展。
它采用的方法与 Spark 所做的其他工作非常相似 - 如果数据集足够大(或者您云创建显式分区),则整个表被透明地拆分为多个分区。然后为每个分区分配一个任务,组成您的 merge工作。任务可以在不同的 Spark 执行器等上运行。
   2) If not, does it apply the delta changes something similar to Apache Hudi ?

我听说过 Apache Hudi,但没看过。
内部, Delta看起来像版本化的 Parquet 文件。
对表的更改以有序的原子单元形式存储,称为提交。
保存表格时 - 查看它有哪些文件 - 它将有文件
像 000000.json、000001.json 等,它们中的每一个都会引用一个
子目录中底层 Parquet 文件的一组操作。例如,
000000.json 会说这个版本及时引用了 Parquet 文件 001
和 002 和 000001.json 会说这个版本及时不应该引用
这两个旧的 Parquet 文件,并且只使用 Parquet 文件 003。
   3) During deduplication how this upsert logic knows to take the latest version of a record? 
Because I don't see any setting to specify the "update timestamp" column?

默认情况下,它引用最新的变更集。
时间戳是在 Delta 中实现此版本控制的内部方式。
您可以通过 AS OF 引用较旧的快照语法 - 见
https://docs.databricks.com/delta/delta-batch.html#syntax

关于apache-spark - 三角洲湖 : How does upsert internally work?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59476892/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com