gpt4 book ai didi

hadoop - 将 Hadoop DistributedCache 与存档一起使用

转载 作者:行者123 更新时间:2023-12-02 20:07:09 25 4
gpt4 key购买 nike

Hadoop的DistributedCache文档似乎没有充分描述如何使用分布式缓存。这是给出的示例:

 // Setting up the cache for the application

1. Copy the requisite files to the FileSystem:

$ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat
$ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip
$ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
$ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
$ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
$ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz

2. Setup the application's JobConf:

JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),
job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);

3. Use the cached files in the Mapper
or Reducer:

public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

private Path[] localArchives;
private Path[] localFiles;

public void configure(JobConf job) {
// Get the cached archives/files
File f = new File("./map.zip/some/file/in/zip.txt");
}

public void map(K key, V value,
OutputCollector<K, V> output, Reporter reporter)
throws IOException {
// Use data from the cached archives/files here
// ...
// ...
output.collect(k, v);
}
}

我已经搜索了一个多小时试图弄清楚如何使用它。在拼凑了其他一些 SO 问题之后,这就是我想出的:
public static void main(String[] args) throws Exception {
Job job = new Job(new JobConf(), "Job Name");
JobConf conf = job.getConfiguration();
DistributedCache.createSymlink(conf);
DistributedCache.addCacheArchive(new URI("/ProjectDir/LookupTable.zip", job);
// *Rest of configuration code*
}

public static class MyMapper extends Mapper<Object, Text, Text, IntWritable>
{
private Path[] localArchives;

public void configure(JobConf job)
{
// Get the cached archive
File file1 = new File("./LookupTable.zip/file1.dat");
BufferedReader br1index = new BufferedReader(new FileInputStream(file1));
}

public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{ // *Map code* }
}
  • 我应该在哪里调用 void configure(JobConf job)功能?
  • 我在哪里使用 private Path[] localArchives目的?
  • 我的代码在 configure() 中吗?以正确的方式访问存档中的文件并将文件与 BufferedReader 链接?
  • 最佳答案

    我将回答您关于分布式缓存的新 API 和常见做法的问题

  • 我应该在哪里调用 void configure(JobConf job) 函数?

  • 框架将调用 protected 无效设置(上下文上下文)方法在每个map任务开始时执行一次,与使用缓存文件相关的逻辑通常在这里处理。例如,读取文件并将数据存储在要在 setup() 之后调用的 map() 函数中使用的变量中
  • 我在哪里使用私有(private) Path[] localArchives 对象?

  • 它通常在 setup() 方法中用于检索缓存文件的路径。像这样的东西。
      Path[] localArchive =DistributedCache.getLocalCacheFiles(context.getConfiguration());
  • 我在 configure() 函数中的代码是正确的访问方式吗
    存档中的文件并将文件与 BufferedReader 链接?

  • 它缺少对方法的调用来检索存储缓存文件的路径(如上所示)。检索到路径后,可以按如下方式读取文件。
    FSDataInputStream in = fs.open(localArchive);
    BufferedReader br = new BufferedReader(new InputStreamReader(in));

    关于hadoop - 将 Hadoop DistributedCache 与存档一起使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21638863/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com