gpt4 book ai didi

pyspark - Databricks 中的 StreamingQuery 增量表 - 描述历史

转载 作者:行者123 更新时间:2023-12-05 05:43:20 31 4
gpt4 key购买 nike

我有一个 Delta 表,我正在读取它作为 StreamingQuery。

使用 DESCRIBE History 查看增量表历史,我看到 99% 的 OperationMetrics 表明 numTargetRowsUpdates 为 0,大多数操作都是插入。但是,偶尔会有 2-3 个 numTargetRowsUpdates > 1。Delta 表上的操作不过是合并。

我是否仍可以使用 StreamingQuery 并将此数据作为流读取,否则我会出错吗?。即:

df: DataFrame = spark \
.readStream \
.format("delta") \
.load(f"{table_location}") \

df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", f "{checkpoint}/{table_location}")\
.trigger(once=True) \
.foreachBatch(process_batch) \
.start()

现在我有另一个 Delta 表,它更像是客户信息的维度表,即电子邮件、姓名、上次上线时间等。我最初将其作为附加的 StreamingQuery 阅读,但出现以下错误:java.lang.UnsupportedOperationException: Detected a data update

查看此表,在描述历史记录中,我看到发生了许多更新。问题:如果我将 StreamQuery 与 IgnoreChanges, True 一起使用,这是否会将更新的记录作为新记录发送,我可以在 foreachBatch 中进一步处理?

最佳答案

如果增量源中有更新或删除,读取流将抛出异常。这从 databricks documentation: 中也很清楚。

Structured Streaming does not handle input that is not an append andthrows an exception if any modifications occur on the table being usedas a source.

如果您使用 IgnoreChanges, True,它不会抛出异常,但会为您提供更新的行 + 可能已经处理过的行。这是因为增量表中的所有内容都发生在文件级别。例如,如果您更新文件中的一行(大致),将发生以下情况:

  1. 查找并读取包含要更新的记录的文件
  2. 写一个新文件,其中包含更新的记录 + 旧文件中的所有其他数据
  3. 在事务日志中将旧文件标记为已删除,并将新文件标记为已添加
  4. 您的读取流会将整个新文件读取为"new"记录。这意味着您可以在自己的 Steam 中获得重复项。

文档中也提到了这一点。

ignoreChanges: re-process updates if files had to be rewritten in thesource table due to a data changing operation such as UPDATE, MERGEINTO, DELETE (within partitions), or OVERWRITE. Unchanged rows maystill be emitted, therefore your downstream consumers should be ableto handle duplicates. ...

您必须决定这是否适合您的用例。如果您需要专门处理更新和删除数据 block 提供 Change Data Feed ,您可以在增量表上启用它。这会为您提供有关插入、追加和删除的行级详细信息(以一些额外的存储和 IO 为代价)。

关于pyspark - Databricks 中的 StreamingQuery 增量表 - 描述历史,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71866652/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com