hadoop—如何将MapReduce作业的输出直接写入分布式缓存,以便将其传递给另一个作业

62lalag4  于 2021-06-04  发布在  Hadoop
关注(0)|答案(3)|浏览(319)

我目前正在练习map reduce(hadoop2.2),需要你在其中一个概念上的帮助。
我有一个用例,我想用两个作业来完成。我希望将job1的输出写入分布式缓存,并将其作为输入传递给第二个job。
基本上,我希望避免将第一个作业的输出写入文件,从而导致开销。

用例输入:

歌曲文件-

|id |歌曲|类型|

|s1 |歌曲1 |古典|
|s2 |歌曲2 |爵士乐|
|s2 |歌曲3 |古典音乐|
.
用户分级文件-

|用户id |歌曲id |评分|

|u1型    |  s1级   |  7   |
|u2乐队    |  s2级   |  5   |
|u3级    |  s2级   |  9   |
|u4级    |  s1级   |  7   |
|u5级    |  s5级   |  5   |
|u6型    |  s1级   |  9   |
注意:这两个文件都包含非常大的数据。

用例描述:

找出每首古典歌曲的平均评分。
我提出的实际/预期解决方案是,我将使用两个链式作业。
1.job1:它将获取所有经典歌曲的id并添加到分布式缓存中
2.job2:第二个作业中的mapper根据缓存中的值过滤经典歌曲的评级。reducer将计算每首歌曲的平均评分。
我在网上搜索,看是否可以将作业的输出直接写入分布式缓存,但找不到有用的信息。
我在stackoverflow上发现了类似的问题:

"How to directly send the output of a mapper-reducer to a another mapper-reducer without
 saving the output into the hdfs"

解决方法是使用“sequencefileoutputformat”。
然而,在我的情况下,我希望所有的歌曲ID可以在第二个工作的每个Map器。因此,我认为上述解决方案将不适用于我的情况。
我想使用的另一种方法是运行第一个作业,找到经典歌曲的id并将输出(song id)写入一个文件,创建一个新作业,然后将song id输出文件添加到第二个作业的缓存中。请告知。
非常感谢你的帮助。

piok6c0g

piok6c0g1#

一种方法是在分布式缓存中加载第一个作业的输出,然后启动第二个作业。

//CONFIGURATION

Job job = Job.getInstance(getConf(), "Reading from distributed cache and etc.");
job.setJarByClass(this.getClass());

////////////
FileSystem fs = FileSystem.get(getConf());

/*
 * if you have, for example, a map only job, 
 * that "something" could be "part-"
 */
FileStatus[] fileList = fs.listStatus(PATH OF FIRST JOB OUTPUT, 
                           new PathFilter(){
                                 @Override public boolean accept(Path path){
                                        return path.getName().contains("SOMETHING");
                                 } 
                            } );

for(int i=0; i < fileList.length; i++){ 
    DistributedCache.addCacheFile(fileList[i].getPath().toUri(), job.getConfiguration());
}

//other parameters

Map器:

//in mapper

@Override
public void setup(Context context) throws IOException, InterruptedException {

    //SOME STRUCT TO STORE VALUES READ (arrayList, HashMap..... whatever)
    Object store = null;

    try{
        Path[] fileCached = DistributedCache.getLocalCacheFiles(context.getConfiguration());

        if(fileCached != null && fileCached.length > 0) {
             for(Path file : fileCached) {
                readFile(file);
                }
        }
    } catch(IOException ex) {
        System.err.println("Exception in mapper setup: " + ex.getMessage());
    }

}

private void readFile(Path filePath) {

    try{
        BufferedReader bufferedReader = new BufferedReader(new FileReader(filePath.toString()));
        String line = null;

        while((line = bufferedReader.readLine()) != null) {

            //reading line by line that file and updating our struct store
            //....

        } //end while (cycling over lines in file)

        bufferedReader.close();

    } catch(IOException ex) {
        System.err.println("Exception while reading file: " + ex.getMessage());
    }
} //end readFile method

现在在Map阶段,将文件作为输入传递给作业,并将所需的值存储在结构中 store .
我的答案来自如何在分布式缓存中使用mapreduce输出。

wswtfjt7

wswtfjt72#

遵循第二种方法。
第一个作业将输出写入文件系统。
第二个作业将通过使用作业api而不是 DistributedCache 已弃用的api。
看看新的job-api方法,比如

addCacheFile(URI uri)
getCacheFiles()

等。

0tdrvxhp

0tdrvxhp3#

如果eah记录的大小小于1mb,则可以将中间结果更新到memcached

相关问题