gpt4 book ai didi

java - 如果 hadoop mapreduce 中至少有一个拆分失败,如何停止处理其他拆分或文件本身

转载 作者:可可西里 更新时间:2023-11-01 16:37:31 25 4
gpt4 key购买 nike

我有一个很大的 CSV 文件,比如 2GB(或者说 10k 行)要使用 map-reduce 进行处理。知道每个 block 是 128MB,我有 16 个 block ,因此有 16 个拆分。因为它是一个文本文件,所以我可以让多个映射器同时处理文件的不同部分(不同的拆分)。映射器计数默认为 4,因此并行处理文件的 4 个部分

要求是,如果至少有一次拆分失败,我不想进一步处理文件,或者不想将文件内容写入输出文件夹。也就是说,如果 CSV 的至少一行导致错误,我不希望该文件被进一步处理,也不需要它的输出。 (这是因为我可以纠正错误并重新运行它)

我该怎么做?

我扩展了 FileInputFormat 以覆盖 isSplittable 方法并返回 true。如果我返回 false,我知道只有一个映射器会处理该文件 - 但我担心网络传输过多会碰巧无法完全处理该文件。

我尝试了 Counter,但问题是我如何让其他映射器知道有人设置了计数器值,因为某些事情失败了

任何指示都会有所帮助。

最佳答案

映射器彼此独立,它们之间不可能直接通信。也就是说,mapper 没有直接的方式将故障传达给其他 mapper。

如果您关心的是清理,您可以在驱动程序类中收集提交时的作业状态。

boolean done = job.waitForCompletion(true);

如果作业失败,则 done 的值为 false。如果它失败了(因为其中一个映射器抛出了异常);只需清理输出目录,或以编程方式修复 CSV 并重新运行。

编辑 - 基于 OP 的评论

The job is set to run till the end and finish gracefully. That is, all exceptions are caught and logged, and thus mappers are not killed. This means, job's finished state will always be SUCCESSFUL. This is done to make sure that mapper won't fail at bad files, but proceed to process good ones, till the last one.

在这种情况下,您可以使用计数器(在映射器中)来增加失败的计数。

context.getCounter("my_group", "bad_record").increment(1);

作业完成后,只需在驱动程序类中获取计数器值,如果计数为正,则将进程标记为失败。

long value= job.getCounters().getGroup("my_group").findCounter("bad_record").getValue();
  • 请注意上面代码中的 Null 检查,以防没有任何记录是错误的并且计数器根本不存在。

编辑 - 添加另一个可能的选项

通过下面的代码,可以从上下文中获取作业实例并尝试在映射器中获取计数器值(并停止进一步处理文件)。但我不确定,如果计数器在执行期间实际上可用,还是仅在执行结束之后可用。

Configuration conf = context.getConfiguration();
Cluster cluster = new Cluster(conf);
Job currentJob = cluster.getJob(context.getJobID());
long val=currentJob.getCounters().findCounter("bad_record").getValue();

我没有测试过。请尝试让我知道。

关于java - 如果 hadoop mapreduce 中至少有一个拆分失败,如何停止处理其他拆分或文件本身,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49015840/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com