java—如何在hadoop中使用multipleoutputs类输出具有特定扩展名(如.csv)的文件

rjee0c15  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(282)

我现在有一个mapreduce程序 MultipleOutputs 将结果输出到多个文件中。减速器如下所示:

private MultipleOutputs mo = new MultipleOutputs<NullWritable, Text>(context);
...
public void reduce(Edge keys, Iterable<NullWritable> values, Context context)
            throws IOException, InterruptedException {
        String date = records.formatDate(millis);
        out.set(keys.get(0) + "\t" + keys.get(1));
        parser.parse(key); 
        String filePath = String.format("%s/part", parser.getFileID());
        mo.write(noval, out, filePath);
    }

这与《hadoop:the definitive guide》一书中的示例非常相似,但问题是它将文件输出为纯文本。我想我的文件输出为.csv文件,但还没有设法找到一本书或网上对它的解释。
如何做到这一点?

xpcnnkqh

xpcnnkqh1#

在驱动程序中的作业对象完成后,是否尝试遍历输出文件夹以重命名文件?
只要您在reducer中发出(文本应该是csv中用分号或任何您需要的分隔符分隔的值的行),您就可以尝试这样做:

Job job = new Job(getConf());
//...
//your job setup, including the output config 
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//...
boolean success = job.waitForCompletion(true);
if (success){
    FileSystem hdfs = FileSystem.get(getConf());
    FileStatus fs[] = hdfs.listStatus(new Path(outputPath));
    if (fs != null){ 
        for (FileStatus aFile : fs) {
            if (!aFile.isDir()) {
                hdfs.rename(aFile.getPath(), new Path(aFile.getPath().toString()+".csv"));
            }
        }
    }
}

相关问题