gpt4 book ai didi

java - 由于 Task attempt failed to report status 600 秒,reduce 失败。杀戮!解决方案?

转载 作者:可可西里 更新时间:2023-11-01 14:15:54 25 4
gpt4 key购买 nike

作业的 reduce 阶段失败并显示:

失败的 Reduce 任务超出了允许的限制。

每个任务失败的原因是:

任务 attempt_201301251556_1637_r_000005_0 未能报告状态达 600 秒。杀!

问题详情:

Map 阶段接收格式为:time, rid, data 的每条记录。

数据的格式为:数据元素及其计数。

例如:a,1 b,4 c,7对应一条记录的数据。

映射器为每个数据元素输出每条记录的数据。例如:

key:(time, a,), val: (rid,data)键:(时间,b,),val:(删除,数据)key:(time, c,), val: (rid,data)

每一个reduce从所有的记录中接收同一个key对应的所有数据。例如:键:(时间,a),值:(rid1,数据)和键:(时间,a),值:(rid2,数据)到达相同的 reduce 实例。

它在这里做了一些处理并输出了类似的 rids。

对于 10MB 这样的小数据集,我的程序运行没有问题。但由于上述原因,当数据增加到 1G 时失败。我不知道为什么会这样。请帮忙!

减少代码:

下面有两个类:

  • VCLReduce0Split
  • 核心拆分

一个。 VCLReduce0SPlit

public class VCLReduce0Split extends MapReduceBase implements Reducer<Text, Text, Text, Text>{
// @SuppressWarnings("unchecked")
public void reduce (Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

String key_str = key.toString();
StringTokenizer stk = new StringTokenizer(key_str);
String t = stk.nextToken();

HashMap<String, String> hmap = new HashMap<String, String>();

while(values.hasNext())
{
StringBuffer sbuf1 = new StringBuffer();
String val = values.next().toString();
StringTokenizer st = new StringTokenizer(val);

String uid = st.nextToken();

String data = st.nextToken();

int total_size = 0;

StringTokenizer stx = new StringTokenizer(data,"|");

StringBuffer sbuf = new StringBuffer();

while(stx.hasMoreTokens())
{
String data_part = stx.nextToken();
String data_freq = stx.nextToken();

// System.out.println("data_part:----->"+data_part+" data_freq:----->"+data_freq);
sbuf.append(data_part);
sbuf.append("|");
sbuf.append(data_freq);
sbuf.append("|");
}
/*
for(int i = 0; i<parts.length-1; i++)
{
System.out.println("data:--------------->"+data);
int part_size = Integer.parseInt(parts[i+1]);
sbuf.append(parts[i]);
sbuf.append("|");
sbuf.append(part_size);
sbuf.append("|");
total_size = part_size+total_size;
i++;
}*/

sbuf1.append(String.valueOf(total_size));
sbuf1.append(",");
sbuf1.append(sbuf);
if(uid.equals("203664471")){
// System.out.println("data:--------------------------->"+data+" tot_size:---->"+total_size+" sbuf:------->"+sbuf);
}
hmap.put(uid, sbuf1.toString());

}

float threshold = (float)0.8;

CoreSplit obj = new CoreSplit();


ArrayList<CustomMapSimilarity> al = obj.similarityCalculation(t, hmap, threshold);

for(int i = 0; i<al.size(); i++)
{
CustomMapSimilarity cmaps = al.get(i);
String xy_pair = cmaps.getRIDPair();
String similarity = cmaps.getSimilarity();
output.collect(new Text(xy_pair), new Text(similarity));
}


}
}

b。 核心拆分

package com.a;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeMap;

import org.apache.commons.collections.map.MultiValueMap;

public class PPJoinPlusCoreOptNewSplit{


public ArrayList<CustomMapSimilarity> similarityCalculation(String time, HashMap<String,String>hmap, float t)
{

ArrayList<CustomMapSimilarity> als = new ArrayList<CustomMapSimilarity>();
ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>();

Iterator<String> iter = hmap.keySet().iterator();

MultiValueMap index = new MultiValueMap();

String RID;
TreeMap<String, Integer> hmap2;
Iterator<String> iter1;

int size;
float prefix_size;
HashMap<String, Float> alpha;
HashMap<String, CustomMapOverlap> hmap_overlap;

String data;

while(iter.hasNext())
{
RID = (String)iter.next();

String data_val = hmap.get(RID);

StringTokenizer st = new StringTokenizer(data_val,",");
// System.out.println("data_val:--**********-->"+data_val+" RID:------------>"+RID+" time::---?"+time);
String RIDsize = st.nextToken();
size = Integer.parseInt(RIDsize);
data = st.nextToken();


StringTokenizer st1 = new StringTokenizer(data,"\\|");


String[] parts = data.split("\\|");

// hmap2 = (TreeMap<String, Integer>)hmap.get(RID);
// iter1 = hmap2.keySet().iterator();

// size = hmap_size.get(RID);

prefix_size = (float)(size-(0.8*size)+1);

if(size==1)
{
prefix_size = 1;
}

alpha = new HashMap<String, Float>();

hmap_overlap = new HashMap<String, CustomMapOverlap>();

// Iterator<String> iter2 = hmap2.keySet().iterator();

int prefix_index = 0;

int pi=0;

for(float j = 0; j<=prefix_size; j++)
{

boolean prefix_chk = false;
prefix_index++;
String ptoken = parts[pi];
// System.out.println("data:---->"+data+" ptoken:---->"+ptoken);
float val = Float.parseFloat(parts[pi+1]);
float temp_j = j;
j = j+val;
boolean j_l = false ;
float prefix_contri = 0;
pi= pi+2;

if(j>prefix_size)
{

// prefix_contri = j-temp_j;
prefix_contri = prefix_size-temp_j;

if(prefix_contri>0)
{
j_l = true;
prefix_chk = false;

}
else
{
prefix_chk = true;
}
}


if(prefix_chk == false){


filters(index, ptoken, RID, hmap,t, size, val, j_l, alpha, hmap_overlap, j, prefix_contri);


CustomMapPrefixTokens cmapt = new CustomMapPrefixTokens(RID,j);
index.put(ptoken, cmapt);

}

}


als = calcSimilarity(time, RID, hmap, alpha, hmap_overlap);

for(int i = 0; i<als.size(); i++)
{
if(als.get(i).getRIDPair()!=null)
{
alsim.add(als.get(i));

}
}

}

return alsim;

}


public void filters(MultiValueMap index, String ptoken, String RID, HashMap<String, String> hmap, float t, int size, float val, boolean j_l, HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap, float j, float prefix_contri)
{
@SuppressWarnings("unchecked")

ArrayList<CustomMapPrefixTokens> positions_list = (ArrayList<CustomMapPrefixTokens>) index.get(ptoken);

if((positions_list!=null) &&(positions_list.size()!=0))
{

CustomMapPrefixTokens cmapt ;
String y;
Iterator<String> iter3;
int y_size = 0;
float check_size = 0;
// TreeMap<String, Integer> hmapy;
float RID_val=0;
float y_overlap = 0;
float ubound = 0;
ArrayList<Float> fl = new ArrayList<Float>();

StringTokenizer st;

for(int k = 0; k<positions_list.size(); k++)
{
cmapt = positions_list.get(k);

if(!cmapt.getRID().equals(RID))
{

y = hmap.get(cmapt.getRID());

// iter3 = y.keySet().iterator();

String yRID = cmapt.getRID();

st = new StringTokenizer(y,",");

y_size = Integer.parseInt(st.nextToken());

check_size = (float)0.8*(size);

if(y_size>=check_size)
{

//hmapy = hmap.get(yRID);

String y_data = st.nextToken();

StringTokenizer st1 = new StringTokenizer(y_data,"\\|");


while(st1.hasMoreTokens())
{
String token = st1.nextToken();
if(token.equals(ptoken))
{

String nxt_token = st1.nextToken();
// System.out.println("ydata:--->"+y_data+" nxt_token:--->"+nxt_token);
RID_val = (float)Integer.parseInt(nxt_token);
break;
}
}

// RID_val = (float) hmapy.get(ptoken);
float alpha1 = (float)(0.8/1.8)*(size+y_size);

fl = overlapCalc(alpha1, size, y_size, cmapt, j, alpha, j_l,RID_val,val,prefix_contri);

ubound = fl.get(0);
y_overlap = fl.get(1);


positionFilter(ubound, alpha1, cmapt, y_overlap, hmap_overlap);

}

}
}
}



}


public void positionFilter( float ubound,float alpha1, CustomMapPrefixTokens cmapt, float y_overlap, HashMap<String, CustomMapOverlap> hmap_overlap)
{

float y_overlap_total = 0;

if(null!=hmap_overlap.get(cmapt.getRID()))
{

y_overlap_total = hmap_overlap.get(cmapt.getRID()).getOverlap();

if((y_overlap_total+ubound)>=alpha1)
{

CustomMapOverlap cmap_tmp = hmap_overlap.get(cmapt.getRID());

float y_o_t = y_overlap+y_overlap_total;

cmap_tmp.setOverlap(y_o_t);
hmap_overlap.put(cmapt.getRID(),cmap_tmp);

}
else
{
float n = 0;
hmap_overlap.put(cmapt.getRID(), new CustomMapOverlap(cmapt.getRID(),n));
}

}
else
{
CustomMapOverlap cmap_tmp = new CustomMapOverlap(cmapt.getRID(),y_overlap);
hmap_overlap.put(cmapt.getRID(), cmap_tmp);

}

}

public ArrayList<Float> overlapCalc(float alpha1, int size, int y_size, CustomMapPrefixTokens cmapt, float j, HashMap<String, Float> alpha, boolean j_l, float RID_val, float val, float prefix_contri )
{

alpha.put(cmapt.getRID(), alpha1);
float min1 = y_size-cmapt.getPosition();
float min2 = size-j;
float min = 0;

float y_overlap = 0;

if(min1<min2)
{
min = min1;
}
else
{
min = min2;
}
if(j_l==true)
{
val = prefix_contri;
}
if(RID_val<val)
{
y_overlap = RID_val;
}
else
{
y_overlap = val;
}

float ubound = y_overlap+min;

ArrayList<Float> fl = new ArrayList<Float>();
fl.add(ubound);
fl.add(y_overlap);

return fl;

}


public ArrayList<CustomMapSimilarity> calcSimilarity( String time, String RID, HashMap<String,String> hmap , HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap)
{

float jaccard = 0;

CustomMapSimilarity cms = new CustomMapSimilarity(null, null);
ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>();

Iterator<String> iter = hmap_overlap.keySet().iterator();

while(iter.hasNext())
{
String key = (String)iter.next();

CustomMapOverlap val = (CustomMapOverlap)hmap_overlap.get(key);

float overlap = (float)val.getOverlap();

if(overlap>0)
{

String yRID = val.getRID();

String RIDpair = RID+" "+yRID;

jaccard = unionIntersection(hmap, RIDpair);

if(jaccard>0.8)
{
cms = new CustomMapSimilarity(time+" "+RIDpair, String.valueOf(jaccard));
alsim.add(cms);
}

}

}

return alsim;

}


public float unionIntersection( HashMap<String,String> hmap, String RIDpair)
{


StringTokenizer st = new StringTokenizer(RIDpair);

String xRID = st.nextToken();

String yRID = st.nextToken();

String xdata = hmap.get(xRID);

String ydata = hmap.get(yRID);


int total_union = 0;

int xval = 0;
int yval = 0;
int part_union = 0;

int total_intersect = 0;

// System.out.println("xdata:------*************>"+xdata);

StringTokenizer xtokenizer = new StringTokenizer(xdata,",");
StringTokenizer ytokenizer = new StringTokenizer(ydata,",");
// String[] xpart = xdata.split(",");
// String[] ypart = ydata.split(",");

xtokenizer.nextToken();
ytokenizer.nextToken();

String datax = xtokenizer.nextToken();
String datay = ytokenizer.nextToken();


HashMap<String,Integer> x = new HashMap<String, Integer>();
HashMap<String,Integer> y = new HashMap<String, Integer>();


String [] xparts;

xparts = datax.toString().split("\\|");


String [] yparts;

yparts = datay.toString().split("\\|");


for(int i = 0; i<xparts.length-1; i++)
{
int part_size = Integer.parseInt(xparts[i+1]);
x.put(xparts[i], part_size);

i++;
}

for(int i = 0; i<yparts.length-1; i++)
{
int part_size = Integer.parseInt(yparts[i+1]);
y.put(xparts[i], part_size);

i++;
}


Set<String> xset = x.keySet();
Set<String> yset = y.keySet();

for(String elm:xset )
{

yval = 0;

xval = (Integer)x.get(elm);

part_union = 0;
int part_intersect = 0;
if(yset.contains(elm)){

yval = (Integer) y.get(elm);

if(xval>yval)
{
part_union = xval;
part_intersect = yval;
}
else
{
part_union = yval;
part_intersect = xval;
}
total_intersect = total_intersect+part_intersect;
}
else
{
part_union = xval;
}

total_union = total_union+part_union;


}


for(String elm: yset)
{
part_union = 0;

if(!xset.contains(elm))
{
part_union = (Integer) y.get(elm);
total_union = total_union+part_union;
}

}

float jaccard = (float)total_intersect/total_union;

return jaccard;

}

}

最佳答案

超时的原因可能是您的 reducer 中的长时间运行计算没有将进度报告回 Hadoop 框架。这可以使用不同的方法解决:

我。在 mapred-site.xml 中增加超时:

<property>
<name>mapred.task.timeout</name>
<value>1200000</value>
</property>

默认值为 600000 毫秒 = 600 秒

二。在 Reducer example in javadoc 中每隔 x 条记录报告进度:

public void reduce(K key, Iterator<V> values,
OutputCollector<K, V> output,
Reporter reporter) throws IOException {
// report progress
if ((noValues%10) == 0) {
reporter.progress();
}

// ...
}

您可以选择增加自定义计数器,如 example :

reporter.incrCounter(NUM_RECORDS, 1);

关于java - 由于 Task attempt failed to report status 600 秒,reduce 失败。杀戮!解决方案?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15281307/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com