mapreduce—hadoop在处理bz2上的wikipedia转储文件时内存不足

44u64gxh  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(534)

我´当我试图对wikipedia转储文件执行map reduce过程时,我读到hadoop会神奇地解压文件并将其拆分到mappers上进行处理。
尽管如此,这一过程并不成功´t完成,日志显示内存不足错误。
我读过一个项目https://github.com/whym/wikihadoop/wiki 它提供了一个名为streamwikidumpinputformat的输入格式,但是我不能直接使用它,因为我的Map器和还原器是为hadoop2.7实现的。
有人能帮我吗?
编辑
我的职业课是这样的

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import fiberClusterer.hadoop.fs.io.WholeFileInputFormat;
import uniandes.mapRed.WCMapper;
import uniandes.mapRed.WCReducer;

public class WordCounter {
    public static void main(String[] args) {
        if (args.length < 2) {
            System.exit(-1);
        }
        String entrada = args[0];
        String salida = args[1];
    try {
        ejecutarJob(entrada, salida);
    } catch (Exception e) {
        e.printStackTrace();
    }

}

public static void ejecutarJob(String entrada, String salida)
        throws IOException, ClassNotFoundException, InterruptedException {

    Configuration conf = new Configuration();
    Job wcJob = Job.getInstance(conf, "WordCounter Job");
    wcJob.setJarByClass(WordCounter.class);

    wcJob.setMapperClass(WCMapper.class);

    wcJob.setMapOutputKeyClass(Text.class);
    wcJob.setMapOutputValueClass(Text.class);
    wcJob.setReducerClass(WCReducer.class);
    wcJob.setOutputKeyClass(Text.class);
    wcJob.setOutputValueClass(Text.class);
    org.apache.hadoop.mapreduce.lib.input.TextInputFormat
    WholeFileInputFormat.setInputPaths(wcJob, new Path(entrada));
    wcJob.setInputFormatClass(WholeFileInputFormat.class);

    TextOutputFormat.setOutputPath(wcJob, new Path(salida));
    wcJob.setOutputFormatClass(TextOutputFormat.class);
    wcJob.waitForCompletion(true);
    System.out.println(wcJob.toString());
    }
}

我的Map器非常简单:

import java.io.IOException;
import java.util.Date;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WCMapper extends Mapper<Text, Text, Text, Text> {
Log log = LogFactory.getLog(WCMapper.class);

@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {

    String lines[] = value.toString().split("\\r?\\n");

    log.info("line");
    for (String line : lines) {
        log.info("line");
        if (line.contains("name")) {
            context.write(new Text((new Date()).toString()), new Text(line));
        }
    }
}
}

也是我的减速机

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WCReducer extends Reducer<Text, Text, Text, Text> {

@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

    for (Text iw : values) {
        context.write(new Text(""), new Text(iw));
    }
}
}

这是我用Yarn检查原木时的输出:

2017-03-26 12:37:07,266 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3332)
    at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
    at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
    at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
    at java.lang.StringBuilder.append(StringBuilder.java:136)
    at fiberClusterer.hadoop.fs.io.MyWholeFileReader.nextKeyValue(MyWholeFileReader.java:104)
    at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
    at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
guz6ccqo

guz6ccqo1#

我不知道你为什么要在你的代码中设置所有这些文件格式。hadoopmapreduce框架负责压缩文件,如果像bz2和lzo这样的可拆分压缩也可以。你需要确保的是文件扩展名必须是正确的。在本例中为bz2或bzip2。下面的代码使用bz2文件扩展名。
工作驱动程序

Arg:
test.bz2 output

package tryout.mapred;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * @author ramesh.b
 */
public class JobDriver extends Configured implements Tool {

    public static void main(String[] args) throws Exception {

        long start = System.currentTimeMillis();
        int res = ToolRunner.run(new Configuration(), new JobDriver(), args);
        long end = System.currentTimeMillis();

        System.out.println("Time spent in millis " + (end - start));
        System.exit(res);
    }

    @SuppressWarnings("deprecation")
    @Override
    public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        try {
            String inputPath = args[0];
            Path outputPath = new Path(args[1]);

            Configuration conf = getConf();

            Job job = new Job(conf);
            job.setJarByClass(JobDriver.class);
            job.setJobName("Simple.0.0");
            job.setReducerClass(SimpleReducer.class);

            job.setOutputFormatClass(TextOutputFormat.class);

            job.setMapperClass(SimpleMapper.class);
            job.setInputFormatClass(TextInputFormat.class);

            FileSystem outfs = outputPath.getFileSystem(conf);

            if (outfs.exists(outputPath)) {
                outfs.delete(outputPath, true);
                log.info("deleted " + outputPath);
            }

            FileInputFormat.addInputPaths(job, inputPath);

            LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
            FileOutputFormat.setOutputPath(job, outputPath);

            return job.waitForCompletion(true) ? 0 : 1;

        } catch (Exception e) {
            e.printStackTrace();
            return 1;
        }
    }
}

simplemapper.java文件

package tryout.mapred;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SimpleMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString().trim();
        if (line.contains("xylophone"))
            context.write(key, value);
    }

}

简单减速机

package tryout.mapred;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Map;

public class SimpleReducer extends Reducer<LongWritable, Text, NullWritable, Text> {

    @Override
    protected void reduce(LongWritable key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        for (Text line : values) {
            context.write(NullWritable.get(), line);
        }
    }
}

相关问题