gpt4 book ai didi

apache-spark - 将spark数据集写入hdfs时如何保证幂等?

转载 作者:行者123 更新时间:2023-12-04 10:39:36 26 4
gpt4 key购买 nike

我有一个写入 hdfs(parquet 文件)的 spark 进程。我的猜测是,默认情况下,如果 spark 出现故障并重试,它可能会写入一些文件两次(我错了吗?)。

但是,我应该怎么做才能在 hdfs 输出上获得幂等性呢?

我看到 2 种情况应该被不同地质疑(但如果你知道更好,请纠正我或开发它):

  1. 写入一项时发生故障:我猜写入已重新启动,因此如果 hdfs 上的帖子不是“原子”w.r.t 以引发写入调用,则可能会写入两次。机会有多大?
  2. 失败发生在任何地方,但由于执行 dag 的制作方式,重启将发生在几个写入任务之前的任务中(例如,我正在考虑必须在某些 groupBy 之前重启)和一些这些写入任务中的一部分已经完成。 spark 执行是否保证不会再次调用这些任务?

最佳答案

我认为这取决于您在工作中使用哪种提交者以及该提交者是否能够撤消失败的工作。例如当您使用 Apache Parquet 格式的输出时,Spark 期望提交者 Parquet 是 ParquetOutputCommitter 的子类。如果您使用此提交程序 DirectParquetOutputCommitter 来附加数据,则它无法撤消该作业。 code

如果您使用 ParquetOutputCommitter 本身,您可以 see它扩展了 FileOutputCommitter 并略微覆盖了 commitJob(JobContext jobContext) 方法。

以下内容是从Hadoop: The Definitive Guide复制/粘贴的

OutputCommitter API:setupJob() 方法在作业运行之前调用,通常用于执行初始化。对于 FileOutputCommitter,该方法创建最终输出目录,${mapreduce.output.fileoutputformat.outputdir},以及一个临时工作空间对于任务输出,_temporary 作为其下的子目录。如果作业成功,则调用 commitJob() 方法,这在默认的基于文件的实现删除临时工作空间并创建一个隐藏的空标记输出目录中名为 _SUCCESS 的文件向文件系统客户端指示该作业顺利完成。如果作业没有成功,则使用状态对象调用 abortJob()指示作业是失败还是被杀死(例如,被用户杀死)。在默认情况下实现,这将删除作业的临时工作空间。

任务级别的操作类似。 setupTask() 方法在任务已运行,默认实现不执行任何操作,因为临时在写入任务输出时创建以任务输出命名的目录。

任务的提交阶段是可选的,可以通过返回 false 来禁用需要任务提交()。这使框架不必运行分布式提交任务的协议(protocol),commitTask()abortTask() 都不会被调用。FileOutputCommitter 将在没有输出被写入时跳过提交阶段任务。

如果任务成功,调用 commitTask(),在默认实现中移动临时任务输出目录(名称中有任务尝试 ID 以避免任务尝试之间的冲突)到最终输出路径,${mapreduce.output.fileoutputformat.outputdir}。否则,框架调用abortTask(),删除临时任务输出目录。

该框架确保在针对特定任务进行多次任务尝试时,只有一个会被提交;其他的将被中止。出现这种情况可能是因为第一次尝试由于某种原因失败了——在这种情况下,它会被中止,然后,成功的尝试将被提交。如果两次任务尝试都可能发生作为推测重复同时运行;在这种情况下,第一个完成的将被提交,另一个将被中止。

关于apache-spark - 将spark数据集写入hdfs时如何保证幂等?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59987368/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com