gpt4 book ai didi

java - Hadoop MapReduce MapContext.write() 线程安全吗?

转载 作者:行者123 更新时间:2023-12-04 08:05:03 24 4
gpt4 key购买 nike

传统上,Hadoop MapReduce 映射器通过写入映射上下文来顺序处理数据并发出值,本示例摘自 org.apache.hadoop.mapreduce.Mapper API 文档显示:

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
但是,如果映射器决定启动多个线程,每个线程都做一部分工作(在映射器中进一步分而治之)怎么办?每个人都想在准备好时发出数据。
是否 MapContext.write() (真的 TaskInputOutputContext.write(KEYOUT key, VALUEOUT value) )支持在映射器中同时调用?或者我必须同步对 MapContext.write() 的调用吗?并保证它们被顺序调用? (如果我应该和我不应该发生什么样的坏事?)
(我预计回复说我不应该首先在映射器中启动多个线程。虽然我尊重这种观点,但实际上这是我目前正在使用的。)

最佳答案

不,它不是默认设置。这就是为什么他们让你覆盖 run()方法,它处理映射器操作的流程。
这是source code , 以及 的默认实现run :

public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
Mapper javaDoc 的最后,这句话给出了一些启示:

Applications may override therun(org.apache.hadoop.mapreduce.Mapper.Context) method to exertgreater control on map processing e.g. multi-threaded Mappers etc.


所以这个想法很明确;您想要多线程环境中的线程安全,这很酷……但是您实现了逻辑。一个 非常基础的这方面的例子可能是(我同步所有,因为我不知道每个方法/元素的细节,仅用于示例):
@Override
public void run(Context context) throws IOException, InterruptedException
{
synchronized(lock) //shared lock, class lock, etc..
{
setup(context);
while (context.nextKeyValue())
map(context.getCurrentKey(), context.getCurrentValue(), context);

cleanup(context);
}
}

虽然没有特别提到这种方法的多线程,但 map() 也会发生类似的情况。 :

protected void map

Called once for each key/value pair in the inputsplit. Most applications should override this, but the default is theidentity function.



在简历中,覆盖这些方法(至少其中一个)应该让您创建一个多线程 Mapper延期。
所以是的, 它可以是线程安全的,但这是你的工作。 默认实现不是 .

更新 - 多线程映射器
好吧,他们确实提供了一个多线程的可运行映射器实现:
无论如何,似乎有关映射/写入操作的同步仍然是您的工作;这只是实现了从不同的池线程调用 run() 的机制。
MultithreadedMapper

Multithreaded implementation for @linkorg.apache.hadoop.mapreduce.Mapper. It can be used instead of thedefault implementation, MapRunner, when the Map operation is not CPUbound in order to improve throughput.

Mapper implementations using this MapRunnable must be thread-safe.

The Map-Reduce job has to be configured with the mapper to use viasetMapperClass(Job, Class) and the number of thread the thread-poolcan use with the getNumberOfThreads(JobContext) method. The defaultvalue is 10 threads.


这里的关键似乎是 setMapperClass()方法;有一个自定义线程安全扩展 Mapper应该作为参数传递。
--
附录 - MultithreadedZipContentLoader
有趣的部分在 main() , 定义使用 MultiMapper 的协议(protocol)
public class MultithreadedZipContentLoader {
public static class ZipContentMapper extends Mapper<Text, Text, DocumentURI, Text>
{
private DocumentURI uri = new DocumentURI();

public void map(Text fileName, Text fileContent, Context context)
throws IOException, InterruptedException
{
uri.setUri(fileName.toString());
context.write(uri, fileContent);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: MultithreadedZipContentLoader configFile inputDir threadCount");
System.exit(2);
}

Job job = Job.getInstance(conf);
job.setJarByClass(MultithreadedZipContentLoader.class);
job.setInputFormatClass(ZipContentInputFormat.class);
job.setMapperClass(MultithreadedMapper.class);
MultithreadedMapper.setMapperClass(job, ZipContentMapper.class);
MultithreadedMapper.setNumberOfThreads(job, Integer.parseInt(args[2]));
job.setMapOutputKeyClass(DocumentURI.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(ContentOutputFormat.class);

ZipContentInputFormat.setInputPaths(job, new Path(otherArgs[1]));

conf = job.getConfiguration();
conf.addResource(otherArgs[0]);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

class ZipContentInputFormat extends FileInputFormat<Text, Text> {

@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}

@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new ZipContentReader();
}

}

...
这个例子唯一缺少的是扩展 map 中的同步部分。方法,因为在这种情况下似乎没有必要(我猜没有重复的文件uris)。

关于java - Hadoop MapReduce MapContext.write() 线程安全吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66251581/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com