gpt4 book ai didi

java - Hadoop DistributedCache 对象在作业期间更改

转载 作者:可可西里 更新时间:2023-11-01 14:59:22 25 4
gpt4 key购买 nike

我正在尝试在 AWS 上运行 KMeans,但在尝试从 DistributedCache 读取更新的集群质心时遇到了以下异常:

java.io.IOException: The distributed cache object s3://mybucket/centroids_6/part-r-00009 changed during the job from 4/8/13 2:20 PM to 4/8/13 2:20 PM
at org.apache.hadoop.filecache.TrackerDistributedCacheManager.downloadCacheObject(TrackerDistributedCacheManager.java:401)
at org.apache.hadoop.filecache.TrackerDistributedCacheManager.localizePublicCacheObject(TrackerDistributedCacheManager.java:475)
at org.apache.hadoop.filecache.TrackerDistributedCacheManager.getLocalCache(TrackerDistributedCacheManager.java:191)
at org.apache.hadoop.filecache.TaskDistributedCacheManager.setupCache(TaskDistributedCacheManager.java:182)
at org.apache.hadoop.mapred.TaskTracker$4.run(TaskTracker.java:1246)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
at org.apache.hadoop.mapred.TaskTracker.initializeJob(TaskTracker.java:1237)
at org.apache.hadoop.mapred.TaskTracker.localizeJob(TaskTracker.java:1152)
at org.apache.hadoop.mapred.TaskTracker$5.run(TaskTracker.java:2541)
at java.lang.Thread.run(Thread.java:662)

是什么设置了这个问题 apart from this one这个错误是间歇性出现的事实。我已经在较小的数据集上成功运行了相同的代码。此外,当我将质心数从 12(在上面的代码中看到)更改为 8 时,它在迭代 5 而不是 6(你可以在上面的 centroids_6 名称中看到)失败。

这是运行 KMeans 循环的主驱动程序中的相关 DistributedCache 代码:

    int iteration = 1;
long changes = 0;
do {
// First, write the previous iteration's centroids to the dist cache.
Configuration iterConf = new Configuration();
Path prevIter = new Path(centroidsPath.getParent(),
String.format("centroids_%s", iteration - 1));
FileSystem fs = prevIter.getFileSystem(iterConf);
Path pathPattern = new Path(prevIter, "part-*");
FileStatus [] list = fs.globStatus(pathPattern);
for (FileStatus status : list) {
DistributedCache.addCacheFile(status.getPath().toUri(), iterConf);
}

// Now, set up the job.
Job iterJob = new Job(iterConf);
iterJob.setJobName("KMeans " + iteration);
iterJob.setJarByClass(KMeansDriver.class);
Path nextIter = new Path(centroidsPath.getParent(),
String.format("centroids_%s", iteration));
KMeansDriver.delete(iterConf, nextIter);

// Set input/output formats.
iterJob.setInputFormatClass(SequenceFileInputFormat.class);
iterJob.setOutputFormatClass(SequenceFileOutputFormat.class);

// Set Mapper, Reducer, Combiner
iterJob.setMapperClass(KMeansMapper.class);
iterJob.setCombinerClass(KMeansCombiner.class);
iterJob.setReducerClass(KMeansReducer.class);

// Set MR formats.
iterJob.setMapOutputKeyClass(IntWritable.class);
iterJob.setMapOutputValueClass(VectorWritable.class);
iterJob.setOutputKeyClass(IntWritable.class);
iterJob.setOutputValueClass(VectorWritable.class);

// Set input/output paths.
FileInputFormat.addInputPath(iterJob, data);
FileOutputFormat.setOutputPath(iterJob, nextIter);

iterJob.setNumReduceTasks(nReducers);

if (!iterJob.waitForCompletion(true)) {
System.err.println("ERROR: Iteration " + iteration + " failed!");
System.exit(1);
}
iteration++;
changes = iterJob.getCounters().findCounter(KMeansDriver.Counter.CONVERGED).getValue();
iterJob.getCounters().findCounter(KMeansDriver.Counter.CONVERGED).setValue(0);
} while (changes > 0);

否则文件会如何修改?我能想到的唯一可能性是,在完成一次迭代后,循环会在上一个作业的质心完成写入之前再次开始。但在评论中,我使用 waitForCompletion(true) 调用作业,因此当循环重新开始时作业不应有任何剩余部分在运行。有什么想法吗?

最佳答案

这并不是一个真正的答案,但我确实意识到以我以前的方式使用 DistributedCache 是愚蠢的,而不是直接从 HDFS 读取上一次迭代的结果。我改为在主驱动程序中编写此方法:

public static HashMap<Integer, VectorWritable> readCentroids(Configuration conf, Path path)
throws IOException {
HashMap<Integer, VectorWritable> centroids = new HashMap<Integer, VectorWritable>();
FileSystem fs = FileSystem.get(path.toUri(), conf);
FileStatus [] list = fs.globStatus(new Path(path, "part-*"));
for (FileStatus status : list) {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, status.getPath(), conf);
IntWritable key = null;
VectorWritable value = null;
try {
key = (IntWritable)reader.getKeyClass().newInstance();
value = (VectorWritable)reader.getValueClass().newInstance();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
while (reader.next(key, value)) {
centroids.put(new Integer(key.get()),
new VectorWritable(value.get(), value.getClusterId(), value.getNumInstances()));
}
reader.close();
}
return centroids;
}

这在每次迭代期间在 Mapper 和 Reducer 的 setup() 方法中调用,以读取上一次迭代的质心。

protected void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
Path centroidsPath = new Path(conf.get(KMeansDriver.CENTROIDS));
centroids = KMeansDriver.readCentroids(conf, centroidsPath);
}

这使我能够删除原始问题中将质心写入 DistributedCache 的循环中的代码块。我对其进行了测试,它现在适用于大型和小型数据集。

我仍然不知道为什么我会收到我发布的错误(如何更改只读 DistributedCache 中的某些内容?尤其是当我在每次迭代中更改 HDFS 路径时?),但这似乎都有效并且是一种阅读质心的更简单的方式。

关于java - Hadoop DistributedCache 对象在作业期间更改,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15885716/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com