gpt4 book ai didi

delta-lake - 如何比较两个版本的 delta 表以获得类似于 CDC 的更改?

转载 作者:行者123 更新时间:2023-12-04 11:47:30 62 4
gpt4 key购买 nike

如果我想使用 delta time-travel 来比较两个版本以获得类似于 CDC 的更改,该怎么做?

我可以看到两个选项:

  • 在 SQL 中,您有 EXCEPT/MINUS 查询,您可以将所有数据与另一个表进行比较。我假设您也可以使用它,对吗?但是,如果您比较的版本越来越大并且您总是需要将所有版本与最新版本的所有行进行比较,那么速度是否足够快?
  • Delta 是否对每行进行某种散列并且可以非常快地完成,或者这对于 delta 来说是否非常耗时?


  • 发现于 slack

    最佳答案

    您可以计算表的两个版本的差异,但正如您猜测的那样,这样做的成本很高。当增量表有除追加之外的变化时,计算实际差异也很棘手。

    通常当人们问到这个问题时,他们试图设计自己的系统,让他们对从 delta 到某个地方的数据进行一次处理; Spark 流 + Delta 源已经存在来做到这一点

    如果你想自己写,你可以直接读取事务日志(协议(protocol)规范在 https://github.com/delta-io/delta/blob/master/PROTOCOL.md )并使用你正在计算的两个版本之间的操作来找出哪些文件有更改要读取

    请注意,增量表的版本被缓存(由 Spark 保存),因此比较不同的数据集应该相当便宜。

    val v0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta/t2")
    val v1 = spark.read.format("delta").option("versionAsOf", 1).load("/tmp/delta/t2")
    // v0 and v1 are persisted - see Storage tab in web UI

    获得那些 v0 和 v1 并不昂贵;比较两者既昂贵又棘手。如果表是仅追加的,则为 (v1 - v0);如果它有 upsert,那么你也必须处理 (v0 - v1),如果它有元数据或协议(protocol)更改,它会变得更加棘手。

    当你自己完成所有这些逻辑时,它与重新实现 DeltaSource 非常相似。

    然后你可以考虑以下几点:
    val log = DeltaLog.forTable(spark, "/tmp/delta/t2")
    val v0 = log.getSnapshotAt(0)
    val actionsAtV0 = v0.state

    val v1 = log.getSnapshotAt(1)
    val actionsAtV1 = v1.state
    actionsAtV0actionsAtV1是将 delta 表分别带到版本 0 和 1 的所有操作,可以认为是 delta 表的 CDC。

    这基本上是读取事务日志,除了使用一些 Delta 的内部 API 使其更容易。

    关于delta-lake - 如何比较两个版本的 delta 表以获得类似于 CDC 的更改?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59591536/

    62 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com