gpt4 book ai didi

hadoop - 通过通用选项-文件的Hadoop分布式缓存

转载 作者:行者123 更新时间:2023-12-02 21:40:20 24 4
gpt4 key购买 nike

当我浏览《 Hadoop In Action》一书时,有一个选项指出,除了使用程序将小文件添加到分布式缓存之外,还可以使用-files通用选项来完成。

当我在代码的setup()中尝试此操作时,在fs.open()处收到FileNotFoundException,它向我显示了不确定的路径。

问题是:
如果我默认使用-files通用选项,则将文件复制到HDFS中的何处?

尝试执行的代码如下。

public class JoinMapSide2 extends Configured implements Tool{

/* Program : JoinMapSide2.java
Description : Passing the small file via GenericOptionsParser
hadoop jar JoinMapSide2.jar -files orders.txt .........
Input : /data/patent/orders.txt(local file system), /data/patent/customers.txt
Output : /MROut/JoinMapSide2
Date : 23/03/2015
*/

protected static class MapClass extends Mapper <Text,Text,NullWritable,Text>{

// hash table to store the key+value from the distributed file or the background data
private Hashtable <String, String> joinData = new Hashtable <String, String>();

// setup function for filling up the joinData for each each map() call
protected void setup(Context context) throws IOException, InterruptedException {

String line;
String[] tokens;

FileSystem fs;
FSDataInputStream fdis;
LineReader joinReader;
Configuration conf;

Text buffer = new Text();

// get configuration
conf = context.getConfiguration();
// get file system related to the configuration
fs = FileSystem.get(conf);

// get all the local cache files distributed as part of the job
URI[] localFiles = context.getCacheFiles();

System.out.println("Cache File Path:"+localFiles[0].toString());

// check if there are any distributed files
// in our case we are sure we will always one so use that only
if (localFiles.length > 0){
// since the file is now on HDFS FSDataInputStream to read through the file
fdis = fs.open(new Path(localFiles[0].toString()));
joinReader = new LineReader(fdis);

// read local file until EOF
try {
while (joinReader.readLine(buffer) > 0) {
line = buffer.toString();
// apply the split pattern only once
tokens = line.split(",",2);
// add key+value into the Hashtable
joinData.put(tokens[0], tokens[1]);
}
} finally {
joinReader.close();
fdis.close();
}
}
else{
System.err.println("No Cache Files are distributed");
}
}

// map function
protected void map(Text key,Text value, Context context) throws IOException, InterruptedException{

NullWritable kNull = null;

String joinValue = joinData.get(key.toString());

if (joinValue != null){
context.write(kNull, new Text(key.toString() + "," + value.toString() + "," + joinValue));
}
}
}

@Override
public int run(String[] args) throws Exception {

if (args.length < 2){
System.err.println("Usage JoinMapSide -files <smallFile> <inputFile> <outputFile>");
}

Path inFile = new Path(args[0]); // input file(customers.txt)
Path outFile = new Path(args[1]); // output file file

Configuration conf = getConf();
// delimiter for the input file
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");

Job job = Job.getInstance(conf, "Map Side Join2");

// this is not used as the small file is distributed to all the nodes in the cluster using
// generic options parser
// job.addCacheFile(disFile.toUri());

FileInputFormat.addInputPath(job, inFile);
FileOutputFormat.setOutputPath(job, outFile);

job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);

job.setJarByClass(JoinMapSide2.class);
job.setMapperClass(MapClass.class);

job.setNumReduceTasks(0);

job.waitForCompletion(true);

return 0;
}

public static void main(String args[]) throws Exception {
int ret = ToolRunner.run(new Configuration(), new JoinMapSide2(), args);

System.exit(ret);
}

这是我在跟踪中看到的以下异常
Error: java.io.FileNotFoundException: File does not exist: /tmp/hadoop-yarn/staging/shiva/.staging/job_1427126201553_0003/files/orders.txt#orders.txt
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:64)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1795)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1738)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1718)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1690)

我像这样开始工作
hadoop jar JoinMapSide2.jar -files orders.txt /data/patent/join/customers.txt /MROut/JoinMapSide2

任何方向都将非常有帮助。谢谢

最佳答案

首先,您需要将orders.txt移至hdfs,并且必须使用-files

关于hadoop - 通过通用选项-文件的Hadoop分布式缓存,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29215659/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com