嗨,我有用hadoop编写的代码,现在我尝试迁移到spark。Map器和还原器相当复杂。所以我尝试在spark程序中重用已经存在的hadoop代码的mapper和reducer类。有人能告诉我如何做到这一点吗?
编辑:
到目前为止,我已经能够在spark中重用标准hadoop单词计数示例的mapper类,实现如下
字数.java
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import java.io.*;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
public final class wordcount extends Configured implements Serializable {
public static int main(String[] args) throws Exception{
SparkConf sparkConf = new SparkConf().setMaster("spark://IMPETUS-I0203:7077").setAppName("wordcount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf); //created Spark context
JavaRDD<String> rec = ctx.textFile("hdfs://localhost:54310/input/words.txt"); //Record Reader
//creating a Pair RDD whose key=some arbitrary number, value = a Record
JavaPairRDD<LongWritable,Text> lines =rec.mapToPair(s->new Tuple2<LongWritable,Text>(new LongWritable(s.length()),new Text(s)));
//transforming 'lines' RDD to another such that it returns for example ('word',1) tuple.
JavaPairRDD<Text,IntWritable> ones = lines.flatMapToPair(it -> {
NotSerializableException notSerializable = new NotSerializableException();
JobConf conf = new JobConf(new Configuration(), wordcount.class);
conf.setJobName("WordCount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
Path inp = new Path("hdfs://localhost:54310/input/darcy.txt");
FileInputFormat.addInputPath(conf, inp);
FileOutputFormat.setOutputPath(conf, out);
WordCountMapper mapper = new WordCountMapper();
mapper.configure(conf);
OutputCollector<Text,IntWritable> output = new outputcollector<Text,IntWritable>() ;
mapper.map(it._1, it._2 , output, Reporter.NULL);
return ((outputcollector)output).getList();
});
ones.saveAsTextFile("hdfs://localhost:54310/output/41");
return 0;
}
}
wordcountmapper.java语言
import java.io.*;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import java.io.Serializable;
public class WordCountMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>,Serializable
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
NotSerializableException notSerializable = new NotSerializableException();
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens())
{
word = new Text();
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
outputcollector.java文件
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.mapred.*;
import scala.Tuple2;
public class outputcollector<K extends Object, V extends Object> implements OutputCollector<K, V>{
private List<Tuple2<K,V>> writer = new ArrayList<Tuple2<K,V>>();
@Override
public void collect(K key, V value) {
try{
writer.add(new Tuple2<K,V>(key,value));
}catch(Exception e){
System.out.println(e+ "\n\n****output collector error\n\n");
}
}
public List<Tuple2<K,V>> getList(){
return writer;
}
}
这个代码工作得非常好,我可以成功地提交这个Spark工作。与纯spark程序相比,它是非常低效的。它比一个简单的spark字数计算示例花费了大约50倍的时间。输入文件为1 gb。hdfs上存在输入文件。在独立模式下运行。
我找不到这个代码慢得像树懒的原因。在这里,我使用wordcountmapper.java只是为了收集对(word,1)。这在记忆中也起作用。所以我不明白为什么我的代码要比标准的spark字数计算示例慢。
那么,有谁能提出一个更好的方法来重用spark中的wordcountmapper.java(hadoop mapper)?或者解释一下为什么它这么慢?或者任何有助于实现我最终目标的东西(在我的问题上面提到)。
1条答案
按热度按时间eulz3vhy1#
将mapreduce转换为spark的基本方法是:
下面的链接指向一组关于cloudera的两篇文章。并不是每件事都会被讨论,但是如果你仔细研究一下,你就会知道如何将hadoop作业的某些部分转换成spark。例如,如何进行设置和清理。
http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/
注意:我曾尝试将mapreduce转换为spark,但它导致应用程序速度较慢。可能是我自己在使用scala时效率低下,或者spark不适合批量作业。所以也要注意这一点。