hadoop distributedcache已弃用-首选api是什么?

6uxekuva  于 2021-06-03  发布在  Hadoop
关注(0)|答案(6)|浏览(410)

我的map任务需要一些配置数据,我希望通过分布式缓存分发这些数据。
hadoop mapreduce教程展示了distributedcache类的用法,大致如下所示:

// In the driver
JobConf conf = new JobConf(getConf(), WordCount.class);
...
DistributedCache.addCacheFile(new Path(filename).toUri(), conf); 

// In the mapper
Path[] myCacheFiles = DistributedCache.getLocalCacheFiles(job);
...

然而, DistributedCache 在hadoop 2.2.0中标记为已弃用。
实现这一目标的首选新方法是什么?是否有一个最新的例子或教程介绍这个api?

bvjxkvbb

bvjxkvbb1#

要扩展@jtravaglini,使用 DistributedCache 对于Yarn/mapreduce 2,如下所示:
在驱动程序中,使用 Job.addCacheFile() ```
public int run(String[] args) throws Exception {
Configuration conf = getConf();

Job job = Job.getInstance(conf, "MyJob");

job.setMapperClass(MyMapper.class);

// ...

// Mind the # sign after the absolute file location.
// You will be using the name after the # sign as your
// file name in your Mapper/Reducer
job.addCacheFile(new URI("/user/yourname/cache/some_file.json#some"));
job.addCacheFile(new URI("/user/yourname/cache/other_file.json#other"));

return job.waitForCompletion(true) ? 0 : 1;

}

在Map器/还原器中,重写 `setup(Context context)` 方法:

@Override
protected void setup(
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
if (context.getCacheFiles() != null
&& context.getCacheFiles().length > 0) {

    File some_file = new File("./some");
    File other_file = new File("./other");

    // Do things to these two files, like read them
    // or parse as JSON or whatever.
}
super.setup(context);

}

f45qwnt8

f45qwnt82#

分布式缓存的api可以在作业类本身中找到。请在此处查看文档:http://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/job.html 代码应该是

Job job = new Job();
...
job.addCacheFile(new Path(filename).toUri());

在Map程序代码中:

Path[] localPaths = context.getLocalCacheFiles();
...
relj7zay

relj7zay3#

上面提到的解决方案都不完全适合我。可能是因为hadoop版本一直在变化,所以我使用的是hadoop2.6.4。实际上,distributedcache是不推荐使用的,所以我不想使用它。然而,正如一些帖子建议我们使用addcachefile()一样,它有了一些变化。这就是它对我的作用

job.addCacheFile(new URI("hdfs://X.X.X.X:9000/EnglishStop.txt#EnglishStop.txt"));

这里x.x.x.x可以是主ip地址或本地主机。englishstop.txt存储在hdfs的/location中。

hadoop fs -ls /

输出为

-rw-r--r--   3 centos supergroup       1833 2016-03-12 20:24 /EnglishStop.txt
drwxr-xr-x   - centos supergroup          0 2016-03-12 19:46 /test

有趣但方便的是,#englishstop.txt意味着现在我们可以在mapper中以“englishstop.txt”的形式访问它。这是相同的代码

public void setup(Context context) throws IOException, InterruptedException     
{
    File stopwordFile = new File("EnglishStop.txt");
    FileInputStream fis = new FileInputStream(stopwordFile);
    BufferedReader reader = new BufferedReader(new InputStreamReader(fis));

    while ((stopWord = reader.readLine()) != null) {
        // stopWord is a word read from Cache
    }
}

这对我来说很管用。您可以从hdfs中存储的文件中读取行

rnmwe5a2

rnmwe5a24#

我也有同样的问题。不仅distributedcach被弃用,getlocalcachefiles和“newjob”也被弃用。因此,对我起作用的是:
司机:

Configuration conf = getConf();
Job job = Job.getInstance(conf);
...
job.addCacheFile(new Path(filename).toUri());

在mapper/reducer设置中:

@Override
protected void setup(Context context) throws IOException, InterruptedException
{
    super.setup(context);

    URI[] files = context.getCacheFiles(); // getCacheFiles returns null

    Path file1path = new Path(files[0])
    ...
}
yv5phkfx

yv5phkfx5#

我没有使用job.addcachefile()。相反,我像以前一样使用了-files选项,比如“-files/path/to/myfile.txt#myfile”。然后在mapper或reducer代码中使用以下方法:

/**
 * This method can be used with local execution or HDFS execution. 
 * 
 * @param context
 * @param symLink
 * @param throwExceptionIfNotFound
 * @return
 * @throws IOException
 */
public static File findDistributedFileBySymlink(JobContext context, String symLink, boolean throwExceptionIfNotFound) throws IOException
{
    URI[] uris = context.getCacheFiles();
    if(uris==null||uris.length==0)
    {
        if(throwExceptionIfNotFound)
            throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache");
        return null;
    }
    URI symlinkUri = null;
    for(URI uri: uris)
    {
        if(symLink.equals(uri.getFragment()))
        {
            symlinkUri = uri;
            break;
        }
    }   
    if(symlinkUri==null)
    {
        if(throwExceptionIfNotFound)
            throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache");
        return null;
    }
    //if we run this locally the file system URI scheme will be "file" otherwise it should be a symlink
    return "file".equalsIgnoreCase(FileSystem.get(context.getConfiguration()).getScheme())?(new File(symlinkUri.getPath())):new File(symLink);

}

然后在mapper/reducer中:

@Override
protected void setup(Context context) throws IOException, InterruptedException
{
    super.setup(context);

    File file = HadoopUtils.findDistributedFileBySymlink(context,"myfile",true);
    ... do work ...
}

请注意,如果我直接使用“-files/path/to/myfile.txt”,那么我需要使用“myfile.txt”来访问该文件,因为这是默认的符号链接名称。

z0qdvdin

z0qdvdin6#

用于yarn/mr2的新distributedcacheapi在 org.apache.hadoop.mapreduce.Job 班级。

Job.addCacheFile()

不幸的是,到目前为止,还没有很多综合教程风格的例子。
http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/job.html#addcachefile%28java.net.uri%29

相关问题