gpt4 book ai didi

apache-spark - Spark 结构化流文件源起始偏移量

转载 作者:行者123 更新时间:2023-12-04 05:14:11 26 4
gpt4 key购买 nike

有没有办法为 Spark 结构化文件流源指定起始偏移量?

我正在尝试从 HDFS 流式传输 Parquet :

spark.sql("SET spark.sql.streaming.schemaInference=true")

spark.readStream
.parquet("/tmp/streaming/")
.writeStream
.option("checkpointLocation", "/tmp/streaming-test/checkpoint")
.format("parquet")
.option("path", "/tmp/parquet-sink")
.trigger(Trigger.ProcessingTime(1.minutes))
.start()

如我所见,第一次运行是处理在路径中检测到的所有可用文件,然后将偏移量保存到检查点位置并仅处理新文件,即接受年龄并且不存在于所见 map 中的文件中。

我正在寻找一种方法,如何指定起始偏移量或时间戳或选项数量,以便在第一次运行时不处理所有可用文件。

有没有我正在寻找的方法?

最佳答案

感谢@jayfah,据我所知,我们可以使用以下技巧模拟 Kafka 的“最新”起始偏移:

  1. 使用 option("latestFirst", true)option("maxFilesPerTrigger", "1") 运行警告流,并带有检查点、虚拟接收器和巨大的处理时间。这样,预热流会将最新的文件时间戳保存到检查点。

  2. 使用 option("maxFileAge", "0") 运行真实流,使用相同检查点位置的真实接收器。在这种情况下,流将只处理新可用的文件。

这很可能不是生产所必需的,并且有更好的方法,例如重新组织数据路径等,但至少我发现这种方式可以作为我问题的答案。

关于apache-spark - Spark 结构化流文件源起始偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51391722/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com