我正在尝试创建一个map-reduce程序来执行k-means算法。我知道使用map reduce不是做迭代算法的最好方法。我已经创建了mapper和reducer类。在mapper代码中,我读取了一个输入文件。当map reduce完成后,我希望结果存储在同一个输入文件中。如何使输出文件覆盖Map器中输入的文件?同样,我使Mapreduce迭代,直到旧输入文件和新输入文件的值收敛,即值之间的差值小于0.1
我的代码是:
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.FileReader;
import java.io.BufferedReader;
import java.util.ArrayList;
public class kmeansMapper extends Mapper<Object, Text, DoubleWritable,
DoubleWritable> {
private final static String centroidFile = "centroid.txt";
private List<Double> centers = new ArrayList<Double>();
public void setup(Context context) throws IOException{
BufferedReader br = new BufferedReader(new
FileReader(centroidFile));
String contentLine;
while((contentLine = br.readLine())!=null){
centers.add(Double.parseDouble(contentLine));
}
}
public void map(Object key, Text input, Context context) throws IOException,
InterruptedException {
String[] fields = input.toString().split(" ");
Double rating = Double.parseDouble(fields[2]);
Double distance = centers.get(0) - rating;
int position = 0;
for(int i=1; i<centers.size(); i++){
Double cDistance = Math.abs(centers.get(i) - rating);
if(cDistance< distance){
position = i;
distance = cDistance;
}
}
Double closestCenter = centers.get(position);
context.write(new DoubleWritable(closestCenter),new
DoubleWritable(rating)); //outputs closestcenter and rating value
}
}
import java.io.IOException;
import java.lang.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
import java.util.*;
public class kmeansReducer extends Reducer<DoubleWritable, DoubleWritable,
DoubleWritable, Text> {
public void reduce(DoubleWritable key, Iterable<DoubleWritable> values,
Context context)// get count // get total //get values in a string
throws IOException, InterruptedException {
Iterator<DoubleWritable> v = values.iterator();
double total = 0;
double count = 0;
String value = ""; //value is the rating
while (v.hasNext()){
double i = v.next().get();
value = value + " " + Double.toString(i);
total = total + i;
++count;
}
double nCenter = total/count;
context.write(new DoubleWritable(nCenter), new Text(value));
}
}
import java.util.Arrays;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class run
{
public static void runJob(String[] input, String output) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
Path toCache = new Path("input/centroid.txt");
job.addCacheFile(toCache.toUri());
job.setJarByClass(run.class);
job.setMapperClass(kmeansMapper.class);
job.setReducerClass(kmeansReducer.class);
job.setMapOutputKeyClass(DoubleWritable.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setNumReduceTasks(1);
Path outputPath = new Path(output);
FileInputFormat.setInputPaths(job, StringUtils.join(input, ","));
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath,true);
job.waitForCompletion(true);
}
public static void main(String[] args) throws Exception {
runJob(Arrays.copyOfRange(args, 0, args.length-1), args[args.length-1]);
}
}
谢谢
1条答案
按热度按时间3xiyfsfu1#
我知道你把免责声明。。但请切换到spark或其他可以解决内存问题的框架。你的生活会好得多。
如果您真的想这样做,只需在runjob中迭代运行代码,并使用一个临时文件名作为输入。您可以在hadoop中看到这个关于移动文件的问题。您需要一个文件系统示例和一个临时文件作为输入:
一般来说,每次迭代完成后
当然,对于第一次迭代,必须将输入路径设置为运行作业时提供的输入路径。后续迭代可以使用tempinputpath,它将是前一个迭代的输出。