gpt4 book ai didi

java - 在一个驱动程序中运行依赖的 hadoop 作业

转载 作者:可可西里 更新时间:2023-11-01 15:41:31 25 4
gpt4 key购买 nike

我目前有两个 hadoop 作业,其中第二个作业需要将第一个作业的输出添加到分布式缓存中。目前我手动运行它们,所以在第一个作业完成后,我将输出文件作为参数传递给第二个作业,它的驱动程序将它添加到缓存中。

第一个作业只是一个简单的 map 作业,我希望在依次执行两个作业时可以运行一个命令。

谁能帮我把第一个作业的输出放到分布式缓存中,以便它可以传递到第二个作业中?

谢谢

编辑:这是作业 1 的当前驱动程序:

public class PlaceDriver {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: PlaceMapper <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Place Mapper");
job.setJarByClass(PlaceDriver.class);
job.setMapperClass(PlaceMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
TextInputFormat.addInputPath(job, new Path(otherArgs[0]));
TextOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

这是 job2 的驱动程序。作业 1 的输出作为第一个参数传递给作业 2 并加载到缓存中

public class LocalityDriver {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: LocalityDriver <cache> <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Job Name Here");
DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(),job.getConfiguration());
job.setNumReduceTasks(1); //TODO: Will change
job.setJarByClass(LocalityDriver.class);
job.setMapperClass(LocalityMapper.class);
job.setCombinerClass(TopReducer.class);
job.setReducerClass(TopReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
TextInputFormat.addInputPath(job, new Path(otherArgs[1]));
TextOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

最佳答案

在同一个 main 中创建两个作业对象。让第一个等待完成,然后再运行另一个。

public class DefaultTest extends Configured implements Tool{


public int run(String[] args) throws Exception {

Job job = new Job();

job.setJobName("DefaultTest-blockx15");

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);

job.setNumReduceTasks(15);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setJarByClass(DefaultTest.class);

job.waitForCompletion(true):

job2 = new Job();

// define your second job with the input path defined as the output of the previous job.


return 0;
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
ToolRunner.run(new DefaultTest(), otherArgs);
}
}

关于java - 在一个驱动程序中运行依赖的 hadoop 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10309939/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com