gpt4 book ai didi

hadoop - (Hadoop) MapReduce - 链式作业 - JobControl 不会停止

转载 作者:可可西里 更新时间:2023-11-01 14:12:49 25 4
gpt4 key购买 nike

我需要链接两个 MapReduce 作业。我使用 JobControl 将 job2 设置为依赖于 job1。它有效,输出文件已创建!但它不会停止!在 shell 中它保持这种状态:

12/09/11 19:06:24 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/09/11 19:06:25 INFO input.FileInputFormat: Total input paths to process : 1
12/09/11 19:06:25 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/09/11 19:06:25 WARN snappy.LoadSnappy: Snappy native library not loaded
12/09/11 19:07:00 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/09/11 19:07:00 INFO input.FileInputFormat: Total input paths to process : 1

我怎样才能阻止它?这是我的主要内容。

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Configuration conf2 = new Configuration();

Job job1 = new Job(conf, "canzoni");
job1.setJarByClass(CanzoniOrdinate.class);
job1.setMapperClass(CanzoniMapper.class);
job1.setReducerClass(CanzoniReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);

ControlledJob cJob1 = new ControlledJob(conf);
cJob1.setJob(job1);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path("/user/hduser/tmp"));


Job job2 = new Job(conf2, "songsort");
job2.setJarByClass(CanzoniOrdinate.class);
job2.setMapperClass(CanzoniSorterMapper.class);
job2.setSortComparatorClass(ReverseOrder.class);
job2.setInputFormatClass(KeyValueTextInputFormat.class);
job2.setReducerClass(CanzoniSorterReducer.class);
job2.setMapOutputKeyClass(IntWritable.class);
job2.setMapOutputValueClass(Text.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);

ControlledJob cJob2 = new ControlledJob(conf2);
cJob2.setJob(job2);
FileInputFormat.addInputPath(job2, new Path("/user/hduser/tmp/part*"));
FileOutputFormat.setOutputPath(job2, new Path(args[1]));

JobControl jobctrl = new JobControl("jobctrl");
jobctrl.addJob(cJob1);
jobctrl.addJob(cJob2);
cJob2.addDependingJob(cJob1);
jobctrl.run();


////////////////
// NEW CODE ///
//////////////


// delete jobctrl.run();
Thread t = new Thread(jobctrl);
t.start();
String oldStatusJ1 = null;
String oldStatusJ2 = null;
while (!jobctrl.allFinished()) {
String status =cJob1.toString();
String status2 =cJob2.toString();
if (!status.equals(oldStatusJ1)) {
System.out.println(status);
oldStatusJ1 = status;
}
if (!status2.equals(oldStatusJ2)) {
System.out.println(status2);
oldStatusJ2 = status2;
}
}
System.exit(0);

}

最佳答案

我基本上做了 Pietro 上面提到的事情。

public class JobRunner implements Runnable {
private JobControl control;

public JobRunner(JobControl _control) {
this.control = _control;
}

public void run() {
this.control.run();
}
}

在我的 map/reduce 类中我有:

public void handleRun(JobControl control) throws InterruptedException {
JobRunner runner = new JobRunner(control);
Thread t = new Thread(runner);
t.start();

while (!control.allFinished()) {
System.out.println("Still running...");
Thread.sleep(5000);
}
}

我只是在其中传递了 jobControl 对象。

关于hadoop - (Hadoop) MapReduce - 链式作业 - JobControl 不会停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12374928/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com