作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
在我们的数据管道中,我们从数据源中摄取 CDC 事件,并将这些更改以 AVRO 格式写入“增量数据”文件夹。
然后定期运行 Spark 作业,将这些“增量数据”与我们当前版本的“快照表”(ORC 格式)合并,以获取上游快照的最新版本。
在此合并逻辑期间:
1) 我们将“增量数据”加载为 DataFrame df1
2) 将当前的“快照表”加载为 DataFrame df2
3) 合并 df1 和 df2 去重 ids 并采用最新版本的行(使用 update_timestamp 列)
这个逻辑将“增量数据”和当前“快照表”的全部数据加载到 Spark 内存中,这取决于数据库,这可能非常大。
我注意到在 Delta Lake 中,使用以下代码完成了类似的操作:
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
最佳答案
1) How does merge/upsert internally works? Does it load entire "updatedDF" and
"/data/events/" into Spark memory?
merge
工作。任务可以在不同的 Spark 执行器等上运行。
2) If not, does it apply the delta changes something similar to Apache Hudi ?
Delta
看起来像版本化的 Parquet 文件。
3) During deduplication how this upsert logic knows to take the latest version of a record?
Because I don't see any setting to specify the "update timestamp" column?
AS OF
引用较旧的快照语法 - 见
关于apache-spark - 三角洲湖 : How does upsert internally work?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59476892/
我是一名优秀的程序员,十分优秀!