- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
当 reducer 达到 67% 时,我们会收到超时异常,我认为这是在排序阶段之后和 reduce 阶段之前。请告知我们应该寻找哪些参数来解决问题。
16/06/15 16:58:13 INFO mapreduce.Job: map 100% reduce 0%
16/06/15 16:58:23 INFO mapreduce.Job: map 100% reduce 24%
16/06/15 16:59:05 INFO mapreduce.Job: map 100% reduce 28%
16/06/15 16:59:08 INFO mapreduce.Job: map 100% reduce 30%
16/06/15 16:59:39 INFO mapreduce.Job: map 100% reduce 33%
16/06/15 17:00:09 INFO mapreduce.Job: map 100% reduce 52%
16/06/15 17:00:12 INFO mapreduce.Job: map 100% reduce 67%
16/06/15 17:05:42 INFO mapreduce.Job: Task Id : attempt_1465992294703_0001_r_000000_2, Status : FAILED
驱动类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CSVLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CSVNLineInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ExchgLogsTransposeDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
@SuppressWarnings("deprecation")
Configuration conf = getConf();
String outPath=null;
String inPath=null;
if(args==null ||args.length==0){
inPath="C:\\HadoopWS\\infile\\";
outPath="C:\\HadoopWS\\outfile\\";
}else{
inPath=args[0];
outPath=args[1];
}
Path output =new Path(outPath);
Path input =new Path(inPath);
FileSystem hdfs = FileSystem.get(conf);
if (hdfs.exists(output)) {
hdfs.delete(output, true);
}
conf.set(CSVLineRecordReader.FORMAT_DELIMITER, "\"");
conf.set(CSVLineRecordReader.FORMAT_SEPARATOR, ",");
conf.setInt(CSVNLineInputFormat.LINES_PER_MAP, 500000);
conf.setBoolean(CSVLineRecordReader.IS_ZIPFILE, false);
Job job = new Job(conf);
job.setJarByClass(ExchgLogsTransposeDriver.class);
job.setMapperClass(ExchgLogsMapper.class);
job.setMapOutputKeyClass(CompositeKey.class);
job.setMapOutputValueClass(CompositeWritable.class);
// job.setNumReduceTasks(2);
job.setMapSpeculativeExecution(true);
job.setPartitionerClass(ActualKeyPartitioner.class);
job.setGroupingComparatorClass(ActualKeyGroupingComparator.class);
job.setSortComparatorClass(CompositeKeyComparator.class);
job.setReducerClass(ExchgLogsReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CompositeWritable.class);
job.getConfiguration().set("mapreduce.output.basename", input.getName());
job.getConfiguration().set("mapreduce.map.output.compress", "true");
// job.getConfiguration().set("mapreduce.map.output.compress.codec", "com.hadoop.compression.lzo.LzoCodec");
job.setInputFormatClass(CSVNLineInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputDirRecursive(job, true);
FileInputFormat.addInputPath(job, new Path(inPath));
FileOutputFormat.setOutputPath(job, new Path(outPath));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String args[]) throws Exception {
System.exit(ToolRunner.run(new ExchgLogsTransposeDriver(), args));
}
}
reducer 类
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ExchgLogsReducer extends Reducer<CompositeKey, CompositeWritable, NullWritable, Text> {
Log log = LogFactory.getLog(ExchgLogsReducer.class);
public static final String NEW = "NEW";
public static final String FW = "FW";
public static final String RE = "RE";
public static final int ZERO = 0;
Text res = new Text();
@Override
public void reduce(CompositeKey key, Iterable<CompositeWritable> value, Context context)
throws IOException, InterruptedException {
List<CompositeValueObj> cache = new ArrayList<CompositeValueObj>();
StringBuilder response = new StringBuilder();
Iterator<CompositeWritable> it = value.iterator();
while (it.hasNext()) {
CompositeWritable currWritable = new CompositeWritable();
currWritable = it.next();
CompositeValueObj obj = new CompositeValueObj();
obj.setRecepient((currWritable.getRecepient().toString()));
obj.setSender(currWritable.getSender().toString());
obj.setType(currWritable.getType().toString());
obj.setTimestamp(currWritable.getTimestamp().toString());
cache.add(obj);
// System.out.println(new Text(" "+"\t" + obj.getRecepient() + "\t"
// + obj.getSender() + "\t" +obj.getType()+ "\t" +
// obj.getTimestamp()));
}
for (int i = 0; i < cache.size(); i++) {
CompositeValueObj currobj = cache.get(i);
String receiver = currobj.getRecepient().toString();
String origSender = currobj.getSender().toString();
String dateFrom = currobj.getTimestamp().toString();
System.out.println(key.getSubject() + " " + "i==>" + i + cache.size());
for (int j = i + 1; j < cache.size(); j++) {
response = new StringBuilder(key.getSubject()).append(",").append(receiver).append(",");
CompositeValueObj nextObj = cache.get(j);
System.out.println(key.getSubject() + " " + "j==>" + j);
String dateTo = nextObj.getTimestamp().toString();
String newSender = nextObj.getSender().toString();
String newRecepient = nextObj.getRecepient().toString();
String mailType = nextObj.getType().toString();
// System.out.println(mailType+ "==>"+receiver+
// "==>"+newRecepient);
if (receiver.equals(newRecepient)) {
response.append(origSender).append(",");
response.append("N,0,0,").append(dateFrom);
break;
}
if (receiver.equals(newSender) && ((mailType.equals(RE) || (mailType.equals(FW))))) {
if (mailType.equals(RE)) {
response.append(origSender).append(",");
response.append("Y,");
response.append(getTimeDiff(dateFrom, dateTo));
response.append(",0,").append(dateTo);
break;
}
if (mailType.equals(FW)) {
response.append(origSender).append(",");
response.append("Y,0,");
response.append(getTimeDiff(dateFrom, dateTo));
response.append(",").append(dateTo);
break;
}
} else {
response.append(origSender).append(",");
response.append("N,0,0,").append(dateFrom);
}
}
if (i + 1 == cache.size()) {
response = new StringBuilder(key.getSubject()).append(",").append(receiver).append(",");
response.append(origSender).append(",");
response.append("N,0,0,").append(dateFrom);
}
res.set(response.toString());
// System.err.println(key.getSubject()+new
// Text(response.toString()));
context.write(NullWritable.get(), res);
}
}
private static double getTimeDiff(String date1, String date2) {
double diff = 0;
double weekend = 0;
boolean isWEchain=false;
boolean isWESent=false;
if (date1 == null || date2 == null) {
return 0;
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
try {
Date from = sdf.parse(date1);
Date to = sdf.parse(date2);
Calendar cal1 = Calendar.getInstance();
Calendar cal2 = Calendar.getInstance();
cal1.setTime(from);
cal2.setTime(to);
int noOfDaysWE = 0;
System.out.println(cal1.get(Calendar.DAY_OF_WEEK));
System.out.println(cal2.get(Calendar.DAY_OF_WEEK));
if ((((Calendar.FRIDAY == cal1.get(Calendar.DAY_OF_WEEK))
|| (Calendar.SATURDAY == cal1.get(Calendar.DAY_OF_WEEK)))
&& ((Calendar.FRIDAY == cal2.get(Calendar.DAY_OF_WEEK))
|| (Calendar.SATURDAY == cal2.get(Calendar.DAY_OF_WEEK))))
) {
isWEchain =true;
}else if((((Calendar.FRIDAY == cal1.get(Calendar.DAY_OF_WEEK))
|| (Calendar.SATURDAY == cal1.get(Calendar.DAY_OF_WEEK)))
&& (((Calendar.FRIDAY != cal2.get(Calendar.DAY_OF_WEEK))
&& (Calendar.SATURDAY != cal2.get(Calendar.DAY_OF_WEEK)))))){
isWESent=true;
if(Calendar.FRIDAY == cal1.get(Calendar.DAY_OF_WEEK)){
cal1.add(Calendar.DATE, 1);
}
cal1.set(Calendar.HOUR,20);
cal1.set(Calendar.MINUTE,0);
cal1.set(Calendar.SECOND,0);
cal1.set(Calendar.MILLISECOND,0);
}
System.out.println(cal1.getTime());
System.out.println(cal2.getTime());
System.out.println(isWESent);
diff=cal2.getTimeInMillis() - cal1.getTimeInMillis();
if(diff < 0 ){
return 0;
}
while (cal1.before(cal2)) {
if ((Calendar.FRIDAY == cal1.get(Calendar.DAY_OF_WEEK))
|| (Calendar.SATURDAY == cal1.get(Calendar.DAY_OF_WEEK))) {
noOfDaysWE++;
}
cal1.add(Calendar.DATE, 1);
}
if (noOfDaysWE != 0) {
weekend = TimeUnit.DAYS.toMillis(noOfDaysWE);
}
if(isWEchain && (noOfDaysWE <= 2)){
return 0;
}
System.out.println(diff);
diff = diff - weekend;
} catch (ParseException e) {
return 0;
}
if (diff != 0)
return diff / 1000;
else
return 0;
}
public static void main(String[] a) {
System.out.println(getTimeDiff("2016-06-03T19:41:48.781Z", "2016-06-05T07:21:01.000Z"));
}
}
最佳答案
请查看 mapred.task.timeout
是 mapred-site.xml
中的毫秒。
修改属性后,需要重启所有的tranckers。(job + task)
提示:如果您想在运行时打印所有配置以检查是否已应用,请使用驱动程序中的以下代码片段。例如:
final JobConf conf = new JobConf(config, this.getClass());
try {
conf.writeXml(System.out);
} catch (final IOException e) {
e.printStackTrace();
}
关于hadoop - Mapreduce - 当 reducer 达到 67% 时超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37836944/
我正在处理一个处理大量数据的项目,所以我最近发现了 MapReduce,在我进一步深入研究之前,我想确保我的期望是正确的。 与数据的交互将通过 Web 界面进行,因此响应时间在这里至关重要,我认为 1
我正在阅读有关 Hadoop 以及它的容错性的文章。我阅读了 HDFS 并阅读了如何处理主节点和从节点的故障。但是,我找不到任何提及 mapreduce 如何执行容错的文档。特别是,当包含 Job T
我正在尝试在我的 Ubuntu 桌面上使用最新的 Hadoop 版本 2.6.0、Java SDK 1.70 来模拟 Hadoop 环境。我用必要的环境参数配置了 hadoop,它的所有进程都已启动并
就目前情况而言,这个问题不太适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、民意调查或扩展讨论。如果您觉得这个问题可以改进并可能重新开放,visit
我只是想针对我们正在做的一些数据分析工作来评估 HBase。 HBase 将包含我们的事件数据。键为 eventId + 时间。我们想要对日期范围内的几种事件类型 (4-5) 进行分析。事件类型总数约
是否有一种快速算法可以在 MapReduce 框架上运行以从巨大的整数集中查找中位数? 最佳答案 我会这样做。这是顺序快速选择的一种并行版本。 (某些映射/归约工具可能不会让您轻松完成任务...) 从
我正在尝试对大型分布式数据集执行一些数值计算。该算法非常适合 MapReduce 模型,具有以下附加属性:与输入数据相比,映射步骤的输出尺寸较小。数据可以被视为只读,并且静态分布在节点上(故障转移时的
假设我在 RavenDb 中有给定的文档结构 public class Car { public string Manufacturer {get;set;} public int B
我刚刚开始使用 mongo 和 map/reduce,在使用 pymongo 时我遇到了以下错误,而在直接使用 mongo 命令行时我没有得到(我意识到有一个类似的问题这个,但我的似乎更基本)。 我直
*基本上我正在尝试按过去一小时内的得分对对象进行排序。 我正在尝试为我的数据库中的对象生成每小时投票总和。投票嵌入到每个对象中。对象架构如下所示: { _id: ObjectId sc
我们怎样才能使我们的 MapReduce 查询更快? 我们使用五节点 Riak 数据库集群构建了一个应用程序。 我们的数据模型由三个部分组成:比赛、联赛和球队。 比赛包含联赛和球队的链接: 型号 va
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以便用事实和引用来回答。 关闭 6 年前。
有没有什么方法可以在运行时获取应用程序 ID - 例如 - 带有 yarn 的 wordcount 示例命令? 我希望使用 yarn 从另一个进程启 Action 业命令,并通过 YARN REST
如何在Hadoop Map-reduce程序中使用机器学习算法?我想使用分类算法、决策树、聚类算法。除了 Mahout 之外,请提出一些想法。 最佳答案 您可以编写自己的MapReduce程序,并在m
虽然 MapReduce 可能不是实现图像处理中使用的算法的最佳方式,但出于好奇,如果我作为初学者尝试使用它们,这将是最简单的实现方式。 最佳答案 Hadoop 非常适合处理大量 IO。因此,例如,您
我只是想验证我对这些参数及其关系的理解,如果我错了请通知我。 mapreduce.reduce.shuffle.input.buffer.percent 告诉分配给 reducer 的整个洗牌阶段的内
HBase 需要 mapreduce/yarn,还是只需要 hdfs? 对于 HBase 的基本用法,例如创建表、插入数据、扫描/获取数据,我看不出有任何理由使用 mapreduce/yarn。 请帮
我问了一些关于提高 Hive 查询性能的问题。一些答案与映射器和化简器的数量有关。我尝试了多个映射器和化简器,但在执行过程中没有发现任何差异。不知道为什么,可能是我没有以正确的方式去做,或者我错过了别
我是 mapreduce 和 hadoop 的新手。我阅读了 mapreduce 的示例和设计模式... 好的,我们可以进入正题了。我们正在开发一种软件,可以监控系统并定期捕获它们的 CPU 使用
我正在使用 Microsoft MapReduce SDK 启动仅 Mapper 作业。 调用 hadoop.MapReduceJob.ExecuteJob 立即抛出“响应状态代码不表示成功:404(
我是一名优秀的程序员,十分优秀!