hadoop mapreduce,如何用mapreduce输出重写输入到mapper中的txt文件?

6g8kf2rb  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(356)

我正在尝试创建一个map-reduce程序来执行k-means算法。我知道使用map reduce不是做迭代算法的最好方法。我已经创建了mapper和reducer类。在mapper代码中,我读取了一个输入文件。当map reduce完成后,我希望结果存储在同一个输入文件中。如何使输出文件覆盖Map器中输入的文件?同样,我使Mapreduce迭代,直到旧输入文件和新输入文件的值收敛,即值之间的差值小于0.1
我的代码是:

import java.io.IOException;
 import java.util.StringTokenizer;
 import java.util.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapreduce.Mapper;
 import java.io.FileReader;
 import java.io.BufferedReader;
 import java.util.ArrayList;

public class kmeansMapper extends Mapper<Object, Text, DoubleWritable, 
DoubleWritable> {
private final static String centroidFile = "centroid.txt";
private List<Double> centers = new ArrayList<Double>();

public void setup(Context context) throws IOException{
        BufferedReader br = new BufferedReader(new 
        FileReader(centroidFile));
        String contentLine;
        while((contentLine = br.readLine())!=null){
            centers.add(Double.parseDouble(contentLine));
        }
}

public void map(Object key, Text input, Context context) throws IOException, 
InterruptedException {

        String[] fields = input.toString().split("  ");
        Double rating = Double.parseDouble(fields[2]);
        Double distance = centers.get(0) - rating;
        int position = 0;
        for(int i=1; i<centers.size(); i++){
            Double cDistance = Math.abs(centers.get(i) - rating);
            if(cDistance< distance){
                position = i;
                distance = cDistance;
            }
        }
        Double closestCenter = centers.get(position);
        context.write(new DoubleWritable(closestCenter),new 
DoubleWritable(rating)); //outputs closestcenter and rating value

        }
}
import java.io.IOException;
import java.lang.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
import java.util.*;

public class kmeansReducer extends Reducer<DoubleWritable, DoubleWritable, 
DoubleWritable, Text> {

public void reduce(DoubleWritable key, Iterable<DoubleWritable> values, 
Context context)// get count // get total //get values in a string
          throws IOException, InterruptedException {
            Iterator<DoubleWritable> v = values.iterator();
            double total = 0;
            double count = 0;
            String value = ""; //value is the rating
            while (v.hasNext()){
              double i = v.next().get();
              value = value + " " + Double.toString(i);
              total = total + i;
              ++count;
            }
            double nCenter = total/count;
  context.write(new DoubleWritable(nCenter), new Text(value));
}
}
import java.util.Arrays;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class run
{

 public static void runJob(String[] input, String output) throws Exception {

    Configuration conf = new Configuration();

  Job job = new Job(conf);
  Path toCache = new Path("input/centroid.txt"); 
  job.addCacheFile(toCache.toUri());
  job.setJarByClass(run.class);
  job.setMapperClass(kmeansMapper.class);
  job.setReducerClass(kmeansReducer.class);
  job.setMapOutputKeyClass(DoubleWritable.class);
  job.setMapOutputValueClass(DoubleWritable.class);

  job.setNumReduceTasks(1);
  Path outputPath = new Path(output);
  FileInputFormat.setInputPaths(job, StringUtils.join(input, ","));
  FileOutputFormat.setOutputPath(job, outputPath);
  outputPath.getFileSystem(conf).delete(outputPath,true);
  job.waitForCompletion(true);

}

public static void main(String[] args) throws Exception {
   runJob(Arrays.copyOfRange(args, 0, args.length-1), args[args.length-1]);

}

}

谢谢

3xiyfsfu

3xiyfsfu1#

我知道你把免责声明。。但请切换到spark或其他可以解决内存问题的框架。你的生活会好得多。
如果您真的想这样做,只需在runjob中迭代运行代码,并使用一个临时文件名作为输入。您可以在hadoop中看到这个关于移动文件的问题。您需要一个文件系统示例和一个临时文件作为输入:

FileSystem fs = FileSystem.get(new Configuration());
Path tempInputPath = Paths.get('/user/th/kmeans/tmp_input';

一般来说,每次迭代完成后

fs.delete(tempInputPath)
fs.rename(outputPath, tempInputPath)

当然,对于第一次迭代,必须将输入路径设置为运行作业时提供的输入路径。后续迭代可以使用tempinputpath,它将是前一个迭代的输出。

相关问题