gpt4 book ai didi

hadoop 警告 EBADF : Bad file descriptor

转载 作者:可可西里 更新时间:2023-11-01 14:17:01 25 4
gpt4 key购买 nike

我是 Hadoop 的新手,尝试使用 Hadoop 编写关系连接。该算法尝试在连续两轮中连接三个关系。我使用递归方法。该程序运行良好。但是在执行期间它会尝试打印这样的警告:

14/12/02 10:41:16 WARN io.ReadaheadPool: Failed readahead on ifile                                                                                                                  
EBADF: Bad file descriptor
at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posix_fadvise(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posixFadviseIfPossible(NativeIO.java:263)
at org.apache.hadoop.io.nativeio.NativeIO$POSIX$CacheManipulator.posixFadviseIfPossible(NativeIO.java:142)
at org.apache.hadoop.io.ReadaheadPool$ReadaheadRequestImpl.run(ReadaheadPool.java:206)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

这很烦人,我想知道问题的原因以及如何摆脱它们。我的代码如下:

public class Recursive {  
/**
* Join three relations together using recursive method
* R JOIN S JOIN T = ((R JOIN S) JOIN T)
*/
static String[] relationSequence; // Keeps sequence of relations in join
static int round; // Round number running
/**
* Mapper
* Relation name = R
* Input tuple = a b
* Output pair = (b, (R,a))
* We assume that join value is the last attribute for the first relation
* and the first attribute for the second relation.
* So using this assumption, this map-reduce algorithm will work for any number of attributes
*/
public static class joinMapper extends Mapper<Object, Text, IntWritable, Text>{
public void map(Object keyIn, Text valueIn, Context context) throws IOException, InterruptedException {
// Read tuple and put attributes in a string array
String curValue = valueIn.toString();
String[] values = curValue.split("\t");
// Get relation name from input file name
String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
// Get join attribute index number R join S
int joinIndex;
String others = "";
if(fileName.compareTo(relationSequence[round])==0){
joinIndex = 0;
others = curValue.substring(0+2);
}else{
joinIndex = values.length - 1;
others = curValue.substring(0, curValue.length()-2);
}
IntWritable joinValue = new IntWritable(Integer.parseInt(values[joinIndex]));

// Create list of attributes which are not join attribute
Text temp = new Text(fileName + "|" + others);
context.write(joinValue,temp);
}
}

/**
* Reducer
*
* 1. Divide the input list in two ArrayLists based on relation name:
* a. first relation
* b. second relation
* 2. Test if the second relation is not empty. If it's so, we shouldn't continue.
* 3. For each element of the first array list, join it with the all elements in
* the second array list
*/
public static class joinReducer extends Reducer<IntWritable, Text, Text, Text>{
public void reduce(IntWritable keyIn, Iterable<Text> valueIn, Context context)
throws IOException, InterruptedException{
ArrayList<String> firstRelation = new ArrayList<String>();
ArrayList<String> secondRelation = new ArrayList<String>();
for (Text value : valueIn) {
String[] values = value.toString().split("\\|");
if(values[0].compareTo(relationSequence[round])==0){
secondRelation.add(values[1]);
}else{
firstRelation.add(values[1]);
}
}
if(secondRelation.size()>0){
for (String firstItem : firstRelation) {
for (String secondItem : secondRelation) {
context.write(new Text(firstItem.toString()), new Text(keyIn.toString() + "\t"
+ secondItem.toString()
));
}
}
}
}

}

/**
* Partitioner
*
* In order to hash pairs to reducer tasks, we used logical which is
* obviously faster than module function.
*/
public static class joinPartitioner extends Partitioner<IntWritable, Text> {
public int getPartition(IntWritable key, Text value, int numReduceTasks) {
int partitionNumber = key.get()&0x007F;
return partitionNumber;
}
}

/**
* Main method
*
* (R join S join T)
* hadoop jar ~/COMP6521.jar Recursive /input/R /input/S /input2/T /output R,S,T
*
* @param args
* <br> args[0]: first relation
* <br> args[1]: second relation
* <br> args[2]: third relation
* <br> args[3]: output directory
* <br> args[4]: relation sequence to join, separated by comma
*/
public static void main(String[] args) throws IllegalArgumentException, IOException, InterruptedException, ClassNotFoundException {
long s = System.currentTimeMillis();
/****** Preparing problem variables *******/
relationSequence = args[4].split(","); // Keep sequence of relations
round = 1; // Variable to keep current round number
int maxOfReducers = 128; // Maximum number of available reducers
int noReducers; // Number of reducers for one particular job
noReducers = maxOfReducers;

Path firstRelation = new Path(args[0]);
Path secondRelation = new Path(args[1]);
Path thirdRelation = new Path(args[2]);
Path temp = new Path("/temp"); // Temporary path to keep intermediate result
Path out = new Path(args[3]);
/****** End of variable Preparing *******/

Configuration conf = new Configuration();

/****** Configuring first job *******/
// General configuration
Job job = Job.getInstance(conf, "Recursive multi-way join (first round)");
job.setNumReduceTasks(noReducers);

// Pass appropriate classes
job.setJarByClass(Recursive.class);
job.setMapperClass(joinMapper.class);
job.setPartitionerClass(joinPartitioner.class);
job.setReducerClass(joinReducer.class);

// Specify input and output type of reducers
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(temp)){ fs.delete(temp, true);}
if(fs.exists(out)) { fs.delete(out, true); }

// Specify the input and output paths
FileInputFormat.addInputPath(job, firstRelation);
FileInputFormat.addInputPath(job, secondRelation);
FileOutputFormat.setOutputPath(job, temp);
/****** End of first job configuration *******/
job.submit();
// Running the first job
boolean b = job.waitForCompletion(true);
if(b){
// try to execute the second job after completion of the first one
round++; // Specify round number
Configuration conf2 = new Configuration(); // Create new configuration object

/****** Configuring second job *******/
// General configuration
Job job2 = Job.getInstance(conf2, "Reduce multi-way join (second round)");
job2.setNumReduceTasks(noReducers);

// Pass appropriate classes
job2.setJarByClass(Recursive.class);
job2.setMapperClass(joinMapper.class);
job2.setPartitionerClass(joinPartitioner.class);
job2.setReducerClass(joinReducer.class);

// Specify input and output type of reducers
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);

// Specify input and output type of mappers
job2.setMapOutputKeyClass(IntWritable.class);
job2.setMapOutputValueClass(Text.class);
// End of 2014-11-25
// Specify the input and output paths
FileInputFormat.addInputPath(job2, temp);
FileInputFormat.addInputPath(job2, thirdRelation);
FileOutputFormat.setOutputPath(job2, out);
/****** End of second job configuration *******/
job2.submit();
// Running the first job
b = job2.waitForCompletion(true);

// Output time measurement
long e = System.currentTimeMillis() - s;
System.out.println("Total: " + e);
System.exit(b ? 0 : 1);
}
System.exit(1);
}

}

最佳答案

我有一个类似的错误,我结束了你的问题,这个 mail list thread EBADF: Bad file descriptor

To clarify a little bit, the readahead pool can sometimes spit out this message if you close a file while a readahead request is in flight. It's not an error and just reflects the fact that the file was closed hastily, probably because of some other bug which is the real problem.

在我的例子中,我关闭了一个 writer 而没有用 hflush

刷新它

由于您似乎没有手动使用 writer 或 reader,我可能会看看您是如何发送 mr 任务的。

关于hadoop 警告 EBADF : Bad file descriptor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27253616/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com