gpt4 book ai didi

apache-spark - Apache Spark/Azure Data Lake Storage - 仅处理一次文件,将文件标记为已处理

转载 作者:行者123 更新时间:2023-12-04 14:07:31 26 4
gpt4 key购买 nike

我有一个 Azure Data Lake Storage 容器,它充当 Apache Spark 处理的 JSON 文件的着陆区。

那里有数以万计的小文件(最多几 MB)。 Spark 代码定期读取这些文件并执行一些转换。

我希望文件只被读取一次并且 Spark 脚本是幂等的。如何确保文件不会被反复读取?我如何以高效的方式做到这一点?

我是这样读取数据的:

spark.read.json("/mnt/input_location/*.json")

我想到了以下方法:

  1. 使用已处理的文件名创建一个 Delta 表,并对输入 DataFrame 运行 EXCEPT 转换
  2. 将处理过的文件移动到不同的位置(或重命名)。我宁愿不那样做。如果我需要重新处理数据,我需要再次运行重命名此操作需要很长时间。

希望有更好的方法。请提出一些建议。

最佳答案

您可以使用启用检查点和 Trigger.Once 的结构化流作业。

该作业的检查点文件将跟踪作业已使用的 JSON 文件。此外,Trigger.Once 触发器将使此流式处理作业如同批处理作业一样。

Databricks 有一篇不错的文章这解释了“为什么 Streaming 和 RunOnce 比 Batch 更好”。

您的结构化流式传输作业可能如下所示:

val checkpointLocation = "/path/to/checkpoints"
val pathToJsonFiles = "/mnt/input_location/"
val streamDF = spark.readStream.format("json").schema(jsonSchema).load(pathToJsonFiles)

val query = streamDF
.[...] // apply your processing
.writeStream
.format("console") // change sink format accordingly
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.Once)
.start()

query.awaitTermination()

关于apache-spark - Apache Spark/Azure Data Lake Storage - 仅处理一次文件,将文件标记为已处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67302744/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com