将缓存文件添加到具有 job.addCacheFile()
用我的制图器把它拉下来 context.getCacheFiles()
. 如何打开缓存文件。我试过使用: BufferedReader reader = new BufferedReader(new FileReader(filename));
(评论如下)
其中filename是 toString()
的 URI
,但我得到一个ioexception文件不存在。有人能帮我吗?
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.regex.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.chain.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.mapreduce.lib.reduce.*;
public class UFOLocation2
{
public static class MapClass extends Mapper<LongWritable, Text, Text, LongWritable>
{
private final static LongWritable one = new LongWritable(1);
private static Pattern locationPattern = Pattern.compile("[a-zA-Z]{2}[^a-zA-Z]*$");
private Map<String, String> stateNames;
@Override
public void setup(Context context)
{
try
{
URI[] cacheFiles = context.getCacheFiles();
setupStateMap(cacheFiles[0].toString());
}
catch (IOException ioe)
{
System.err.println("Error reading state file.");
ioe.printStackTrace();
System.exit(1);
}
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String line = value.toString();
String[] fields = line.split("\t");
String location = fields[2].trim();
if (location.length() >= 2)
{
Matcher matcher = locationPattern.matcher(location);
if (matcher.find())
{
int start = matcher.start();
String state = location.substring(start, start + 2);
context.write(new Text(lookupState(state.toUpperCase())), one);
}
}
}
private void setupStateMap(String filename) throws IOException
{
Map<String, String> states = new HashMap<String, String>();
// the following line causes an IOException
BufferedReader reader = new BufferedReader(new FileReader(filename));
String line = reader.readLine();
while (line != null)
{
String[] split = line.split("\t");
states.put(split[0], split[1]);
line = reader.readLine();
}
stateNames = states;
}
private String lookupState(String state)
{
String fullName = stateNames.get(state);
return fullName == null ? "Other" : fullName;
}
}
public static void main(String[] args) throws Exception
{
Configuration config = new Configuration();
Job job = Job.getInstance(config, "UFO Location 2");
job.setJarByClass(UFOLocation2.class);
job.addCacheFile(new URI("/user/kevin/data/states.txt"));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
Configuration mapconf1 = new Configuration(false);
ChainMapper.addMapper(job, UFORecordValidationMapper.class, LongWritable.class,
Text.class, LongWritable.class, Text.class, mapconf1);
Configuration mapconf2 = new Configuration(false);
ChainMapper.addMapper(job, MapClass.class, LongWritable.class,
Text.class, Text.class, LongWritable.class, mapconf2);
job.setMapperClass(ChainMapper.class);
job.setCombinerClass(LongSumReducer.class);
job.setReducerClass(LongSumReducer.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
这是代码问题还是配置问题?我在一个所谓的psuedo分布式集群上运行
1条答案
按热度按时间91zkwejq1#
你可以用这样的东西