hadoop 1 input file=1输出文件,仅Map

vshtjzan  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(261)

我是hadoop新手,但这是我上个月的一个学习项目。
为了使这一点足够模糊,以便对其他人有用,让我先抛出基本目标。。。。假设:
你有一个很大的数据集(显然),有数百万个基本的ascii文本文件。
每个文件都是一个“记录”
记录存储在目录结构中,以标识客户和日期
e、 g./user/hduser/data/customer1/yyyy-mm-dd,/user/hduser/data/customer2/yyyy-mm-dd
您想要模拟输出结构的输入结构
e、 g./user/hduser/out/customer1/yyyy-mm-dd,/user/hduser/out/customer2/yyyy-mm-dd
我研究了多个线程:
多输出路径java hadoop mapreduce
新api中的multipletextoutputformat替代方案
hadoop mapreduce中的独立输出文件
推测性任务执行——尝试解决-m-part#####问题
还有更多。。我也一直在读汤姆怀特的hadoop书。我一直很想学这个。我经常在新的api和旧的api之间交换,这增加了试图学习这个api的困惑。
许多人指出multipleoutputs(或旧的api版本),但我似乎无法生成所需的输出——例如,multipleoutputs似乎不接受“/”来在write()中创建目录结构
需要采取哪些步骤来创建具有所需输出结构的文件?目前,我有一个wholefileinputformat类和相关的recordreader,它有一个(nullwritable k,bytewritable v)对(如果需要可以更改)
我的Map设置:

public class MapClass extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
    private Text filenameKey;
    private MultipleOutputs<NullWritable, Text> mos;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        InputSplit split = context.getInputSplit();
        Path path = ((FileSplit) split).getPath();
        filenameKey = new Text(path.toString().substring(38)); // bad hackjob, until i figure out a better way.. removes hdfs://master:port/user/hduser/path/
        mos = new MultipleOutputs(context);
    }
}

还有一个cleanup()函数调用mos.close(),map()函数目前是一个未知函数(这里我需要帮助)
这些信息足以让新手找到答案吗?我接下来的想法是在每个map()任务中创建一个multipleoutputs()对象,每个对象都有一个新的baseoutput字符串,但我不确定它是否有效,甚至不确定应该采取什么样的操作。
建议会很感激,在这一点上,程序中的任何东西都可以改变,除了输入——我只是想学习框架——但我希望尽可能接近这个结果(稍后我可能会考虑将记录合并到更大的文件中,但它们已经是每个记录20mb,我想在我无法在记事本上阅读之前确保它能正常工作
编辑:这个问题可以通过修改/扩展textoutputformat.class来解决吗?似乎它可能有一些方法可以工作,但我不确定哪些方法我需要覆盖。。。

anhgbhbe

anhgbhbe1#

如果关闭推测执行,则不会阻止您在Map器中手动创建输出文件夹结构/文件,并将记录写入其中(忽略输出上下文/收集器)
例如,扩展代码段(setup方法),可以执行以下操作(这基本上是多个输出正在执行的操作,但假设关闭了推测执行以避免两个Map任务试图写入同一输出文件时发生文件冲突):

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MultiOutputsMapper extends
        Mapper<LongWritable, Text, NullWritable, NullWritable> {
    protected String filenameKey;
    private RecordWriter<Text, Text> writer;
    private Text outputValue;
    private Text outputKey;

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // operate on the input record
        // ...

        // write to output file using writer rather than context
        writer.write(outputKey, outputValue);
    }

    @Override
    protected void setup(Context context) throws IOException,
            InterruptedException {
        InputSplit split = context.getInputSplit();
        Path path = ((FileSplit) split).getPath();

        // extract parent folder and filename
        filenameKey = path.getParent().getName() + "/" + path.getName();

        // base output folder
        final Path baseOutputPath = FileOutputFormat.getOutputPath(context);
        // output file name
        final Path outputFilePath = new Path(baseOutputPath, filenameKey);

        // We need to override the getDefaultWorkFile path to stop the file being created in the _temporary/taskid folder
        TextOutputFormat<Text, Text> tof = new TextOutputFormat<Text, Text>() {
            @Override
            public Path getDefaultWorkFile(TaskAttemptContext context,
                    String extension) throws IOException {
                return outputFilePath;
            }
        };

        // create a record writer that will write to the desired output subfolder
        writer = tof.getRecordWriter(context);
    }

    @Override
    protected void cleanup(Context context) throws IOException,
            InterruptedException {
        writer.close(context);
    }
}

需要考虑的几点:
customerx/yyyy-MM-dd 路径文件或文件夹文件(如果是文件夹文件,则需要相应修改-此实现假定每个日期有一个文件,文件名为yyyy-mm-dd)
您可能希望查看lazyoutputformat以防止创建空的输出Map文件

相关问题