gpt4 book ai didi

java - 在 Hadoop Mapreduce 的 MultithreadedMapper 类的内部线程映射器之间共享大对象?

转载 作者:可可西里 更新时间:2023-11-01 14:51:46 25 4
gpt4 key购买 nike

我在 java 中有一个简单的 Hadoop 作业,它带有一个逐行处理我的文件的映射器。每个映射器都不受 CPU 限制,但应该在内存中保存一个非常大的对象(在我的例子中是一个 Bloom Filter),其大小为 2-15 GB(取决于计算精度)。在 Mapper 的 setup() 方法中,我从磁盘读取这个对象并创建它。

我遇到了 MultithreadedMapper 类来在多线程中执行我的计算。

job.setMapperClass(MultithreadMapper.class);
// ...
MultithreadedMapper.setMapperClass(job, MySingleThreadMapper.class);
MultithreadedMapper.setNumberOfThreads(job, 16);

但似乎 MultithreadedMapper 使用内部 private class MapRunner extends Thread 来生成线程映射器:

public class MultithreadedMapper<K1, V1, K2, V2> extends Mapper<K1, V1, K2, V2> {
//...
public void run(Context context) throws IOException, InterruptedException {
// ...

runners = new ArrayList<MapRunner>(numberOfThreads);
for(int i=0; i < numberOfThreads; ++i) {
MapRunner thread = new MapRunner(context);
thread.start();
runners.add(i, thread);
}
}
}

这里的问题是:如何在 MultithreadedMapper 中创建非常非常大的对象并在集群节点(相同的 jvm)的线程映射器之间共享它(使用上下文或其他)?

我试图通过单例模式来实现它,但似乎不是一个很好的解决方案。

最佳答案

前言:我以前从未这样做过,但我会使用静态锁进行初始化来实现它:

static class MySingleThreadMapper extends Mapper<LongWritable, Text, Text, Text> {

static MyResource sharedResource;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
synchronized (MySingleThreadMapper.class) {
if (sharedResource == null) {
sharedResource = createResource();
}
}
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// mapper code
// sharedResource will be initialized here
}
}

您可能已经知道,Hadoop 在单独的 JVM 实例中生成其 Map 和 Reduce 任务。所以你所有的单线程映射器都将在同一个 JVM 中运行,你可以依赖静态锁。您可以使用任何其他静态对象作为锁,您的共享资源只会被初始化一次。

关于java - 在 Hadoop Mapreduce 的 MultithreadedMapper 类的内部线程映射器之间共享大对象?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42785066/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com