在spark中高效地重用hadoop代码?

e5nszbig  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(478)

嗨,我有用hadoop编写的代码,现在我尝试迁移到spark。Map器和还原器相当复杂。所以我尝试在spark程序中重用已经存在的hadoop代码的mapper和reducer类。有人能告诉我如何做到这一点吗?
编辑:
到目前为止,我已经能够在spark中重用标准hadoop单词计数示例的mapper类,实现如下
字数.java

import scala.Tuple2; 

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

import java.io.*;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public final class wordcount  extends Configured  implements Serializable {
  public static int main(String[] args) throws Exception{

        SparkConf sparkConf = new SparkConf().setMaster("spark://IMPETUS-I0203:7077").setAppName("wordcount");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf); //created Spark context
        JavaRDD<String> rec = ctx.textFile("hdfs://localhost:54310/input/words.txt"); //Record Reader

  //creating a  Pair RDD whose key=some arbitrary number, value = a Record
        JavaPairRDD<LongWritable,Text> lines =rec.mapToPair(s->new Tuple2<LongWritable,Text>(new LongWritable(s.length()),new Text(s)));

  //transforming 'lines' RDD to another such that it returns for example ('word',1) tuple.
      JavaPairRDD<Text,IntWritable> ones = lines.flatMapToPair(it -> {

          NotSerializableException notSerializable = new NotSerializableException();
          JobConf conf = new JobConf(new Configuration(), wordcount.class);
          conf.setJobName("WordCount");
          conf.setOutputKeyClass(Text.class);
          conf.setOutputValueClass(IntWritable.class);
          Path inp = new Path("hdfs://localhost:54310/input/darcy.txt");
          FileInputFormat.addInputPath(conf, inp);
          FileOutputFormat.setOutputPath(conf, out);

          WordCountMapper mapper = new WordCountMapper();
        mapper.configure(conf);
        OutputCollector<Text,IntWritable> output = new outputcollector<Text,IntWritable>() ;

            mapper.map(it._1, it._2 , output, Reporter.NULL);

          return  ((outputcollector)output).getList();
        });

        ones.saveAsTextFile("hdfs://localhost:54310/output/41");
        return 0;
  }
}

wordcountmapper.java语言

import java.io.*;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import java.io.Serializable;

public class WordCountMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>,Serializable
{
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();

      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
            NotSerializableException notSerializable = new NotSerializableException();
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens())
            {  
               word = new Text();
               word.set(tokenizer.nextToken());
               output.collect(word, one);
            }
       }
}

outputcollector.java文件

import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.mapred.*;
import scala.Tuple2;

public class outputcollector<K extends Object, V extends Object> implements OutputCollector<K, V>{
    private List<Tuple2<K,V>> writer = new ArrayList<Tuple2<K,V>>();
    @Override
    public void collect(K key, V value) {
        try{
            writer.add(new Tuple2<K,V>(key,value));
        }catch(Exception e){
            System.out.println(e+ "\n\n****output collector error\n\n");
        }
    }
    public List<Tuple2<K,V>> getList(){
        return writer;
    }
}

这个代码工作得非常好,我可以成功地提交这个Spark工作。与纯spark程序相比,它是非常低效的。它比一个简单的spark字数计算示例花费了大约50倍的时间。输入文件为1 gb。hdfs上存在输入文件。在独立模式下运行。
我找不到这个代码慢得像树懒的原因。在这里,我使用wordcountmapper.java只是为了收集对(word,1)。这在记忆中也起作用。所以我不明白为什么我的代码要比标准的spark字数计算示例慢。
那么,有谁能提出一个更好的方法来重用spark中的wordcountmapper.java(hadoop mapper)?或者解释一下为什么它这么慢?或者任何有助于实现我最终目标的东西(在我的问题上面提到)。

eulz3vhy

eulz3vhy1#

将mapreduce转换为spark的基本方法是:

rdd.mapPartitions(partition -> 
    setup() //map setup
    partition.map( item => 
        val output = process(item) 
        if (!partition.hasNext) {
           // Some cleanup code here
        }
    )
).groupByKey().mapPartitions( //similarly for reduce code).saveAsHadoopFile(//params) //To save on HDFS

下面的链接指向一组关于cloudera的两篇文章。并不是每件事都会被讨论,但是如果你仔细研究一下,你就会知道如何将hadoop作业的某些部分转换成spark。例如,如何进行设置和清理。
http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/
注意:我曾尝试将mapreduce转换为spark,但它导致应用程序速度较慢。可能是我自己在使用scala时效率低下,或者spark不适合批量作业。所以也要注意这一点。

相关问题