我已经编写了一个简单的mapreduce作业(基于单词计数示例)来获取文本文件中的单词总数。我一行一行地检查文件,在Map它之前,我做一些处理。除了在Map前从行中删除某些单词外,所有这些似乎都起作用。
在开始作业之前,我从一个文件中读取了一个单词列表,在Map一行之前应该删除这些单词。我让程序在读入后打印出单词列表,效果很好。问题是:一旦作业开始,我的包含单词的arraylist似乎又是空的。有趣的是,只有在eclipse(jar文件)之外启动程序时才会发生这种情况,在eclipse中,单词get deleted。eclipse之外的最终结果是1320万个单词,尽管它应该是1340万个单词(没有从列表中删除这些单词)。在eclipse中,结果是840万。
为什么?非常感谢你的帮助!
这是我的密码:
import java.io.*;
import java.util.*;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, NullWritable, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private final static NullWritable nullKey = NullWritable.get();
public void map(LongWritable key, Text value, OutputCollector< NullWritable, IntWritable> output, Reporter reporter) throws IOException {
String processedline = LineProcessor.processLine(value.toString());
StringTokenizer tokenizer = new StringTokenizer(processedline);
while (tokenizer.hasMoreTokens()) {
tokenizer.nextToken();
output.collect(nullKey, one);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<NullWritable, IntWritable, NullWritable, IntWritable> {
private final static NullWritable nullKey = NullWritable.get();
public void reduce(NullWritable key, Iterator<IntWritable> values, OutputCollector<NullWritable, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(nullKey, new IntWritable(sum));
}
}
public static class LineProcessor{
public static ArrayList<String> stopWordsList = new ArrayList<String>();
public static void initializeStopWords() throws IOException{
Path stop_words = new Path("/user/ds2013/stop_words/english_stop_list.txt");
FileSystem fs = FileSystem.get(new Configuration());
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(stop_words)));
String stopWord;
stopWord = br.readLine();
while (stopWord != null){
//addToStopWords
stopWordsList.add(stopWord);
stopWord = br.readLine();
}
}
public static String processLine(String line) {
line = line.toLowerCase();
//delete some punctuation
char[] remove = {'.', ',','"'};
for (char c : remove) {
line = line.replace(""+c, "");
}
//Replace "-" with Space
line = line.replace("-", " ");
//delete stop Words
StringTokenizer tokenizer = new StringTokenizer(line);
String nextWord = tokenizer.nextToken();
while (tokenizer.hasMoreTokens()) {
if(stopWordsList.contains(nextWord)){
line = line.replace(nextWord, "");
}
nextWord = tokenizer.nextToken();
}
return line;
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setMapOutputKeyClass(NullWritable.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
//initialize List of words that should be deletet
LineProcessor.initializeStopWords();
//Directories
FileInputFormat.setInputPaths(conf, new Path("/user/ds2013/data/plot_summaries.txt"));
Path outputDir = new Path( args[0] );
//delete output folder if it already exists
FileSystem fs = FileSystem.get(conf);
fs.delete(outputDir, true);
FileOutputFormat.setOutputPath(conf, outputDir);
JobClient.runJob(conf);
}
}
1条答案
按热度按时间siv3szwd1#
如果您通过命令行提交作业,它将为此创建一个客户端进程。因此,在主方法中进行初始化:
在一个完全不同的进程中运行。通常,您会将此init内容移动到Map器中的一个设置函数中,您可以覆盖该函数(在您使用的旧api中是):
或者在较新的api中: