我正在为mapsidejoin编写一个程序。关于我的项目中存在的缓存文件的路径,我得到了一个错误。
Map器类:
public class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, Text>{
private Map<String, String> departmentMap = new HashMap<String, String>();
private BufferedReader br;
private String strDeptName = new String();
private Text txtMapOutputKey = new Text(" ");
private Text txtMapOutputValue = new Text(" ");
enum MyCounter {
RECORD_COUNT,
FILE_EXISTS,
FILE_NOT_FOUND,
OTHER_EXCEPTION
}
// This method gets the file from cache and reads it
protected void setup(Context ctx) throws IOException {
URI[] cachefiles = ctx.getCacheFiles();
for(URI eachUri: cachefiles) {
if(eachUri.toString().equals("department")) {
ctx.getCounter(MyCounter.FILE_EXISTS).increment(1);
loadDepartmentsHashMap(eachUri,ctx);
}
}
}
// This method loads the department file into the memory
private void loadDepartmentsHashMap(URI eachUri,Context ctx) throws IOException {
String readLine = new String();
try {
br = new BufferedReader(new FileReader(eachUri.toString())); // Load the cache file and read line by line
while((readLine = br.readLine()) != null) {
String[] deptFieldArray = readLine.split("\t");
departmentMap.put(deptFieldArray[0], deptFieldArray[1]);
}
} catch(FileNotFoundException e) {
e.printStackTrace();
ctx.getCounter(MyCounter.FILE_NOT_FOUND).increment(1);
} catch(IOException e) {
e.printStackTrace();
ctx.getCounter(MyCounter.OTHER_EXCEPTION).increment(1);
} finally {
if(br != null) {
br.close();
}
}
}
public void map(LongWritable key, Text value, Context ctx) throws IOException, InterruptedException {
ctx.getCounter(MyCounter.RECORD_COUNT).increment(1);
if(value.toString().length() > 0) {
String[] empAttributes = value.toString().split("\t");
try {
strDeptName = departmentMap.get(empAttributes[3].toString()); // This method gets the department for the employees that are present in both the files
} finally {
strDeptName = ((strDeptName.equals(null) || strDeptName.equals(""))?"Not-Found":strDeptName);
}
txtMapOutputKey.set(empAttributes[0].toString());
txtMapOutputValue.set(empAttributes[0].toString() + "\t" +
empAttributes[1].toString() + "\t" +
empAttributes[2].toString() + "\t" +
empAttributes[3].toString() + "\t" +
strDeptName );
}
ctx.write(txtMapOutputKey, txtMapOutputValue);
strDeptName = " ";
}
}
我的驾驶员等级:
public class MapJoinDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int exitcode = ToolRunner.run(new Configuration(), new MapJoinDriver(), args);
System.exit(exitcode);
}
@Override
public int run(String[] args) throws Exception {
if(args.length != 2) {
System.out.printf("Two parameters are required - <input dir> <output dir>");
return -1;
}
Job job = Job.getInstance(new Configuration());
job.setJobName("Map side join with Distributed Cache");
job.addCacheFile(new Path("/MapSideJoin/data/department").toUri());
job.setJarByClass(MapJoinDriver.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapSideJoinMapper.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success?0:1;
}
}
我得到的错误是:
当我在linux中执行jar文件时,我得到一个异常“file does not exists”。我试着用不同的方法来运行它。有些我修不好。
下面是我的项目的层次结构
谁能告诉我文件路径的错误是什么?我的问题不是在哪里添加文件夹或文件。我想修复路径异常,它将正确读取文件夹中存在的文件。
暂无答案!
目前还没有任何答案,快来回答吧!