gpt4 book ai didi

hadoop - 设置和访问分布式缓存的问题

转载 作者:行者123 更新时间:2023-12-02 20:07:35 26 4
gpt4 key购买 nike

出于某种原因,我在网上找不到任何好的资源来让分布式缓存与新 API 一起工作。希望这里有人可以解释我做错了什么。我目前的尝试是我在网上找到的各种东西的混搭。

该程序尝试运行 k-最近邻算法。输入文件是测试数据集,而分布式缓存保存训练数据集和训练标签。映射器应取一行测试数据,将其与分布式缓存数据中的每一行进行比较,并返回与其最相似的行的标签。

import java.net.URI;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class KNNDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf("Usage: %s [generic options] <input dir> <output dir>\n", getClass().getSimpleName());
return -1;
}

Configuration conf = new Configuration();
// conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "^");

conf.setInt ("train_rows",1000);
conf.setInt ("test_rows",1000);
conf.setInt ("cols",612);
DistributedCache.addCacheFile(new URI("cacheData/train_sample.csv"),conf);
DistributedCache.addCacheFile(new URI("cacheData/train_labels.csv"),conf);

Job job = new Job(conf);
job.setJarByClass(KNNDriver.class);
job.setJobName("KNN");

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(KNNMapper.class);
job.setReducerClass(KNNReducer.class);
// job.setInputFormatClass(KeyValueTextInputFormat.class);

job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);

boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new KNNDriver(), args);
System.exit(exitCode);
}
}

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.util.Scanner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class KNNMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {

int[][] train_vals;
int[] train_label_vals;
int train_rows;
int test_rows;
int cols;

@Override
public void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();

// Path[] cacheFiles = context.getLocalCacheFiles();

int train_rows = conf.getInt("train_rows", 0);
int test_rows = conf.getInt("test_rows", 0);
int cols = conf.getInt("cols", 0);

train_vals = new int[train_rows][cols];
train_label_vals = new int[train_rows];

// read train csv, parse, and store into 2d int array
Scanner myScan;
try {
myScan = new Scanner(new File("train_sample.csv"));

//Set the delimiter used in file
myScan.useDelimiter("[,\r\n]+");

//Get all tokens and store them in some data structure
//I am just printing them

System.out.println("myScan loaded for train_sample");

for(int row = 0; row < train_rows; row++) {
for(int col = 0; col < cols; col++) {
train_vals[row][col] = Integer.parseInt(myScan.next().toString());

}
}

myScan.close();

} catch (FileNotFoundException e) {
System.out.print("Error: Train file not found.");
}

// read train_labels csv, parse, and store into 2d int array
try {
myScan = new Scanner(new File("train_labels.csv"));

//Set the delimiter used in file
myScan.useDelimiter("[,\r\n]+");

//Get all tokens and store them in some data structure
//I am just printing them

System.out.println("myScan loaded for train_sample");


for(int row = 0; row < train_rows; row++) {
train_label_vals[row] = Integer.parseInt(myScan.next().toString());
}

myScan.close();

} catch (FileNotFoundException e) {
System.out.print("Error: Train Labels file not found.");
}
}

@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

// setup() gave us train_vals & train_label_vals.
// Each line in map() represents a test observation. We iterate
// through every train_val row to find nearest L2 match, then
// return a key/value pair of <observation #,

// convert from Text to String
String line = value.toString();
long distance;
double best_distance = Double.POSITIVE_INFINITY;
int col_num;

int best_digit = -1;
IntWritable rowId = null;
int i;
IntWritable rowNum;
String[] pixels;

// comma delimited files, split on commas
// first we find the # of rows
for (i = 0; i < train_rows; i++) {
distance = 0;
col_num = 0;
pixels = line.split(",");
rowId = new IntWritable(Integer.parseInt(pixels[0]));

for (int j = 1; j < cols; j++) {
distance += (Integer.parseInt(pixels[j]) - train_vals[i][j-1])^2;
}
if (distance < best_distance) {
best_distance = distance;
best_digit = train_label_vals[i];
}
}
context.write(rowId, new IntWritable(best_digit));
}
}

我注释掉了 Path... 语句,因为我不明白它的作用,或者它如何将文件数据发送到映射器,但我注意到它在几个网站上列出。目前,即使将分布式缓存数据集上传到 HDFS,该程序也无法找到它们。

最佳答案

尝试使用符号链接(symbolic link):

DistributedCache.createSymlink(conf);
DistributedCache.addCacheFile(new URI("cacheData/train_sample.csv#train_sample.csv"),conf);
DistributedCache.addCacheFile(new URI("cacheData/train_labels.csv#train_labels.csv"),conf);

这将使映射器的本地目录中的文件以您实际尝试访问的名称可用。

关于hadoop - 设置和访问分布式缓存的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21247085/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com