java—标识驻留在文件夹中的文件的扩展名,以便使用hadoop mapreduce处理文件

aurhwmvo  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(434)

我需要处理一个文本文件文件夹。文本文件可以是任何扩展名。
对于每个扩展,我们需要单独的自定义读取器,以便在hadoop中处理该文件。

folder1/
   Data1.pdf
   Data2.xml
   Data3.html
   Data4.txt
   Data5.csv

在文件夹中获取文件扩展名并为mr作业设置自定义输入格式的更好方法是什么?
到目前为止我所做的是
司机

FileStatus[] stati = null;
try {
    stati = fs.listStatus(in);
} catch (FileNotFoundException e) {
    e.printStackTrace();
} catch (IOException e) {
    e.printStackTrace();
}
for (FileStatus status : stati) {
    Path path = status.getPath();
    System.out.println("Path----> "+path);
    /*
     * get file extension
     */
    String ext = FilenameUtils.getExtension(path.toString());
    System.out.println("ext--->"+ext);
    if(ext.equals("pdf")){
          //custom pdf record reader
         job.setInputFormatClass(PdfInputFormat.class);
    }
    else{
         job.setInputFormatClass(TextInputFormat.class);
    }
}

但是这在folder1中不起作用,但是如果folder1只包含.pdf文件,这就可以了。
我错过什么了吗?
希望这不会正常工作,因为我正在遍历文件夹(例如folder2-->data5.pdf,data4.csv)。此setinputformatclass不会对textinputformat显示任何影响。addinputpath(job,in)因为inputformat将使用data4.csv的上一次迭代值设置。

job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(NullWritable.class);
    job.setMapperClass(MyMapper.class);
    job.setReducerClass(MyReducer.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    try {
        TextInputFormat.addInputPath(job, in);
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    TextOutputFormat.setOutputPath(job, out);

编辑

Job job = null;
try {
 job = new Job(conf, "TextMining");
} catch (IOException e) {
 e.printStackTrace();
}
/*
 * check entension
 */
for (FileStatus status : stati) {
 Path path = status.getPath();
 System.out.println("Path----> "+path);
 /*
  * get file extension
  */
 String ext = FilenameUtils.getExtension(path.toString());
 System.out.println("ext--->"+ext);
 if(ext.equals("pdf")){
       System.out.println("Pdf File Format");
      // MultipleInputs.addInputPath(job, path,PdfInputFormat.class, PDFStemmingMapper.class);
       job.setInputFormatClass(PdfInputFormat.class);
 }
 else if(ext.equals("xlsx")){ 
       System.out.println("Excel File Format");
       job.setInputFormatClass(ExcelInputFormat.class);
 }
 else{
       System.out.println("normal Text File");
       job.setInputFormatClass(TextInputFormat.class);
 }
}
job.setJarByClass(Driver.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
//job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);

//   try {
  //    TextInputFormat.addInputPath(job, in);
  //   } catch (IOException e) {
  //    e.printStackTrace();
  //   }
    TextOutputFormat.setOutputPath(job, out);

我正在努力实现上面的目标。但这并没有产生任何效果。请建议。

zzzyeukh

zzzyeukh1#

对于Map器,所需的输入是一条记录(由值指示)。inputformat处理如何构造此记录并将其传递给map方法。
例如:默认输入格式textinputformat将文件中的一行视为记录。这通常适用于txt/xsv文件。
对于其他文件类型,更好的方法是使用自定义inputformat,它知道如何表示一条记录(在xml中,记录可以是一个子块)
如果已经为所有已标识的文件类型设置了inputformat类,则可以使用多个Input。
看看这里的javadochttps://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapreduce/lib/input/multipleinputs.html
可以使用文件系统api识别扩展

uplii1fm

uplii1fm2#

从上下文中,获取输入拆分,然后从路径中获取路径和名称。 Context => getInputSplit() => getPath => getName() 一旦你知道了名字,找到 lastIndexOf(".") 从索引中提取子字符串。
现在在子字符串中有了扩展名并使用它进行比较。
编辑:
下面的方法对你可行吗?
为每种类型的扩展都有单独的Map器。
在driver类中添加以下行。

MultipleInputs.addInputPath(job, path_pdf,inputFormatClass, PDFMapper.class)

MultipleInputs.addInputPath(job, path_xml, inputFormatClass,XMLMapper.class)

MultipleInputs.addInputPath(job, path_html,inputFormatClass,HTMLMapper.class)

MultipleInputs.addInputPath(job, path_csv,inputFormatClass,CVSMapper.class)

相关问题