mapjoin程序中的文件路径错误

rbpvctlc  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(183)

我正在为mapsidejoin编写一个程序。关于我的项目中存在的缓存文件的路径,我得到了一个错误。
Map器类:

public class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, Text>{
    private Map<String, String> departmentMap = new HashMap<String, String>();
    private BufferedReader br;
    private String strDeptName = new String();
    private Text txtMapOutputKey = new Text(" ");
    private Text txtMapOutputValue = new Text(" ");

    enum MyCounter {
        RECORD_COUNT,
        FILE_EXISTS,
        FILE_NOT_FOUND,
        OTHER_EXCEPTION
    }

    // This method gets the file from cache and reads it
    protected void setup(Context ctx) throws IOException {
        URI[] cachefiles = ctx.getCacheFiles();
        for(URI eachUri: cachefiles) {
            if(eachUri.toString().equals("department")) {
                ctx.getCounter(MyCounter.FILE_EXISTS).increment(1);
                loadDepartmentsHashMap(eachUri,ctx);
            }
        }
    }

    // This method loads the department file into the memory
    private void loadDepartmentsHashMap(URI eachUri,Context ctx) throws IOException {
        String readLine = new String();
        try {
            br = new BufferedReader(new FileReader(eachUri.toString()));    // Load the cache file and read line by line
            while((readLine = br.readLine()) != null) {
                String[] deptFieldArray = readLine.split("\t");
                departmentMap.put(deptFieldArray[0], deptFieldArray[1]);
            }
        } catch(FileNotFoundException e) {
            e.printStackTrace();
            ctx.getCounter(MyCounter.FILE_NOT_FOUND).increment(1);
        } catch(IOException e) {
            e.printStackTrace();
            ctx.getCounter(MyCounter.OTHER_EXCEPTION).increment(1);
        } finally {
            if(br != null) {
                br.close();
            }
        }
    }

    public void map(LongWritable key, Text value, Context ctx) throws IOException, InterruptedException {
        ctx.getCounter(MyCounter.RECORD_COUNT).increment(1);
        if(value.toString().length() > 0) {
            String[] empAttributes = value.toString().split("\t");
            try {
                strDeptName = departmentMap.get(empAttributes[3].toString()); // This method gets the department for the employees that are present in both the files
            } finally {
                strDeptName = ((strDeptName.equals(null) || strDeptName.equals(""))?"Not-Found":strDeptName);
            }
            txtMapOutputKey.set(empAttributes[0].toString());
            txtMapOutputValue.set(empAttributes[0].toString() + "\t" + 
                                  empAttributes[1].toString() + "\t" + 
                                  empAttributes[2].toString() + "\t" + 
                                  empAttributes[3].toString() + "\t" + 
                                  strDeptName );
        }
        ctx.write(txtMapOutputKey, txtMapOutputValue);
        strDeptName = " ";
    }
}

我的驾驶员等级:

public class MapJoinDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
    int exitcode = ToolRunner.run(new Configuration(), new MapJoinDriver(), args);
    System.exit(exitcode);
}

@Override
public int run(String[] args) throws Exception {
    if(args.length != 2) {
        System.out.printf("Two parameters are required - <input dir> <output dir>");
        return -1;
    }
    Job job = Job.getInstance(new Configuration());
    job.setJobName("Map side join with Distributed Cache");
    job.addCacheFile(new Path("/MapSideJoin/data/department").toUri());
    job.setJarByClass(MapJoinDriver.class);
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.setMapperClass(MapSideJoinMapper.class);
    job.setNumReduceTasks(0);
    boolean success = job.waitForCompletion(true);
    return success?0:1;
}
}

我得到的错误是:

当我在linux中执行jar文件时,我得到一个异常“file does not exists”。我试着用不同的方法来运行它。有些我修不好。
下面是我的项目的层次结构

谁能告诉我文件路径的错误是什么?我的问题不是在哪里添加文件夹或文件。我想修复路径异常,它将正确读取文件夹中存在的文件。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题