我´当我试图对wikipedia转储文件执行map reduce过程时,我读到hadoop会神奇地解压文件并将其拆分到mappers上进行处理。
尽管如此,这一过程并不成功´t完成,日志显示内存不足错误。
我读过一个项目https://github.com/whym/wikihadoop/wiki 它提供了一个名为streamwikidumpinputformat的输入格式,但是我不能直接使用它,因为我的Map器和还原器是为hadoop2.7实现的。
有人能帮我吗?
编辑
我的职业课是这样的
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import fiberClusterer.hadoop.fs.io.WholeFileInputFormat;
import uniandes.mapRed.WCMapper;
import uniandes.mapRed.WCReducer;
public class WordCounter {
public static void main(String[] args) {
if (args.length < 2) {
System.exit(-1);
}
String entrada = args[0];
String salida = args[1];
try {
ejecutarJob(entrada, salida);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void ejecutarJob(String entrada, String salida)
throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job wcJob = Job.getInstance(conf, "WordCounter Job");
wcJob.setJarByClass(WordCounter.class);
wcJob.setMapperClass(WCMapper.class);
wcJob.setMapOutputKeyClass(Text.class);
wcJob.setMapOutputValueClass(Text.class);
wcJob.setReducerClass(WCReducer.class);
wcJob.setOutputKeyClass(Text.class);
wcJob.setOutputValueClass(Text.class);
org.apache.hadoop.mapreduce.lib.input.TextInputFormat
WholeFileInputFormat.setInputPaths(wcJob, new Path(entrada));
wcJob.setInputFormatClass(WholeFileInputFormat.class);
TextOutputFormat.setOutputPath(wcJob, new Path(salida));
wcJob.setOutputFormatClass(TextOutputFormat.class);
wcJob.waitForCompletion(true);
System.out.println(wcJob.toString());
}
}
我的Map器非常简单:
import java.io.IOException;
import java.util.Date;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WCMapper extends Mapper<Text, Text, Text, Text> {
Log log = LogFactory.getLog(WCMapper.class);
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String lines[] = value.toString().split("\\r?\\n");
log.info("line");
for (String line : lines) {
log.info("line");
if (line.contains("name")) {
context.write(new Text((new Date()).toString()), new Text(line));
}
}
}
}
也是我的减速机
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WCReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text iw : values) {
context.write(new Text(""), new Text(iw));
}
}
}
这是我用Yarn检查原木时的输出:
2017-03-26 12:37:07,266 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at fiberClusterer.hadoop.fs.io.MyWholeFileReader.nextKeyValue(MyWholeFileReader.java:104)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
1条答案
按热度按时间guz6ccqo1#
我不知道你为什么要在你的代码中设置所有这些文件格式。hadoopmapreduce框架负责压缩文件,如果像bz2和lzo这样的可拆分压缩也可以。你需要确保的是文件扩展名必须是正确的。在本例中为bz2或bzip2。下面的代码使用bz2文件扩展名。
工作驱动程序
simplemapper.java文件
简单减速机