gpt4 book ai didi

hadoop - 在 Hadoop 0.20 中使用分布式缓存进行复制连接

转载 作者:行者123 更新时间:2023-12-02 21:57:30 26 4
gpt4 key购买 nike

我一直在尝试在集群和 karmasphere 接口(interface)上使用分布式缓存进行复制连接。我在下面粘贴了代码。我的程序在缓存中找不到文件

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Hashtable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

// A demostration of Hadoop's DistributedCache tool
//

public class MapperSideJoinWithDistributedCache extends Configured implements Tool {
private final static String inputa = "C:/Users/LopezGG/workspace/Second_join/input1_1" ;
public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text> {

private Hashtable<String, String> joinData = new Hashtable<String, String>();

@Override
public void configure(JobConf conf) {
try {
Path [] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
System.out.println("ds"+DistributedCache.getLocalCacheFiles(conf));
if (cacheFiles != null && cacheFiles.length > 0) {
String line;
String[] tokens;
BufferedReader joinReader = new BufferedReader(new FileReader(cacheFiles[0].toString()));

try {
while ((line = joinReader.readLine()) != null) {
tokens = line.split(",", 2);
joinData.put(tokens[0], tokens[1]);
}
} finally {
joinReader.close();
}
}
else
System.out.println("joinreader not set" );
} catch(IOException e) {
System.err.println("Exception reading DistributedCache: " + e);
}
}

public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String joinValue = joinData.get(key.toString());
if (joinValue != null) {
output.collect(key,new Text(value.toString() + "," + joinValue));
}
}
}


public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, MapperSideJoinWithDistributedCache.class);

DistributedCache.addCacheFile(new Path(args[0]).toUri(), job);
//System.out.println( DistributedCache.addCacheFile(new Path(args[0]).toUri(), conf));
Path in = new Path(args[1]);
Path out = new Path(args[2]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("DataJoin with DistributedCache");
job.setMapperClass(MapClass.class);
job.setNumReduceTasks(0);
job.setInputFormat( KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.set("key.value.separator.in.input.line", ",");
JobClient.runJob(job);
return 0;
}

public static void main(String[] args) throws Exception {
long time1= System.currentTimeMillis();
System.out.println(time1);
int res = ToolRunner.run(new Configuration(),
new MapperSideJoinWithDistributedCache(),args);
long time2= System.currentTimeMillis();
System.out.println(time2);
System.out.println("millsecs elapsed:"+(time2-time1));
System.exit(res);

}
}

我得到的错误是
O mapred.MapTask: numReduceTasks: 0
Exception reading DistributedCache: java.io.FileNotFoundException: \tmp\hadoop-LopezGG\mapred\local\archive\-2564469513526622450_-1173562614_1653082827\file\C\Users\LopezGG\workspace\Second_join\input1_1 (The system cannot find the file specified)
ds[Lorg.apache.hadoop.fs.Path;@366a88bb
12/04/24 23:15:01 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
12/04/24 23:15:01 INFO mapred.LocalJobRunner:

但任务执行完成。有人请帮助我>我查看了其他帖子并进行了所有修改,但仍然无法正常工作

最佳答案

我必须承认,我从不使用 DistributedCache 类(而是通过 GenericOptionsParser 使用 -files 选项),但我不确定 DistributedCache 在运行作业之前会自动将本地文件复制到 HDFS 中。

虽然我在 Hadoop 文档中找不到任何证据证明这一事实,但在 Pro Hadoop 书中提到了一些与此有关的内容:

  • http://books.google.com/books?id=8DV-EzeKigQC&pg=PA133&dq=%22The+URI+must+be+on+the+JobTracker+shared+file+system%22&hl=en&sa=X&ei=jNGXT_LKOKLA6AG1-7j6Bg&ved=0CEsQ6AEwAA#v=onepage&q=%22The%20URI%20must%20be%20on%20the%20JobTracker%20shared%20file%20system%22&f=false

  • 在您的情况下,首先将文件复制到 HDFS,然后在调用 DistributedCache.addCacheFile 时,在 HDFS 中传递文件的 URI,看看是否适合您

    关于hadoop - 在 Hadoop 0.20 中使用分布式缓存进行复制连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10309320/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com