在hadoop mapreduce中的多线程Mapper类的内部线程Map器之间共享大型对象?

wnavrhmk  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(366)

我在java中有一个简单的hadoop作业,它使用一个Map器逐行处理我的文件。每个Map器不受cpu限制,但应该在内存中保存一个非常大的对象(在我的例子中是bloom过滤器),其大小为2-15gb(取决于计算精度)。在Map绘制者的 setup() 方法我从磁盘读取这个对象并创建它。
我遇到了 MultithreadedMapper 类以在多个线程中执行计算。

job.setMapperClass(MultithreadMapper.class);
// ...
MultithreadedMapper.setMapperClass(job, MySingleThreadMapper.class);
MultithreadedMapper.setNumberOfThreads(job, 16);

但看起来 MultithreadedMapper 使用内部 private class MapRunner extends Thread 要生成线程Map器,请执行以下操作:

public class MultithreadedMapper<K1, V1, K2, V2> extends Mapper<K1, V1, K2, V2> {
//...
    public void run(Context context) throws IOException, InterruptedException {
    // ...

        runners =  new ArrayList<MapRunner>(numberOfThreads);
        for(int i=0; i < numberOfThreads; ++i) {
            MapRunner thread = new MapRunner(context);
            thread.start();
            runners.add(i, thread);
        }
    }
}

问题是:如何在multi-threadedmapper中创建一次非常大的对象,并在集群节点(同一jvm)的线程Map器之间共享它(使用上下文或其他什么)?
我试图通过一个单例模式,但如果似乎不是一个美丽的解决方案。

aij0ehis

aij0ehis1#

序言:我以前从未这样做过,但我会使用静态锁进行初始化:

static class MySingleThreadMapper extends Mapper<LongWritable, Text, Text, Text> {

    static MyResource sharedResource;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        synchronized (MySingleThreadMapper.class) {
             if (sharedResource == null) {
                 sharedResource = createResource();
             }
        }
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       // mapper code
       // sharedResource will be initialized here
    }
}

正如您可能已经知道的那样,hadoop在单独的jvm示例中生成Map并减少任务。因此,所有的单线程Map器都将在同一个jvm中运行,您可以依赖静态锁。您可以使用任何其他静态对象作为锁,共享资源将只初始化一次。

相关问题