我需要处理一个文本文件文件夹。文本文件可以是任何扩展名。
对于每个扩展,我们需要单独的自定义读取器,以便在hadoop中处理该文件。
folder1/
Data1.pdf
Data2.xml
Data3.html
Data4.txt
Data5.csv
在文件夹中获取文件扩展名并为mr作业设置自定义输入格式的更好方法是什么?
到目前为止我所做的是
司机
FileStatus[] stati = null;
try {
stati = fs.listStatus(in);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
for (FileStatus status : stati) {
Path path = status.getPath();
System.out.println("Path----> "+path);
/*
* get file extension
*/
String ext = FilenameUtils.getExtension(path.toString());
System.out.println("ext--->"+ext);
if(ext.equals("pdf")){
//custom pdf record reader
job.setInputFormatClass(PdfInputFormat.class);
}
else{
job.setInputFormatClass(TextInputFormat.class);
}
}
但是这在folder1中不起作用,但是如果folder1只包含.pdf文件,这就可以了。
我错过什么了吗?
希望这不会正常工作,因为我正在遍历文件夹(例如folder2-->data5.pdf,data4.csv)。此setinputformatclass不会对textinputformat显示任何影响。addinputpath(job,in)因为inputformat将使用data4.csv的上一次迭代值设置。
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
try {
TextInputFormat.addInputPath(job, in);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
TextOutputFormat.setOutputPath(job, out);
编辑
Job job = null;
try {
job = new Job(conf, "TextMining");
} catch (IOException e) {
e.printStackTrace();
}
/*
* check entension
*/
for (FileStatus status : stati) {
Path path = status.getPath();
System.out.println("Path----> "+path);
/*
* get file extension
*/
String ext = FilenameUtils.getExtension(path.toString());
System.out.println("ext--->"+ext);
if(ext.equals("pdf")){
System.out.println("Pdf File Format");
// MultipleInputs.addInputPath(job, path,PdfInputFormat.class, PDFStemmingMapper.class);
job.setInputFormatClass(PdfInputFormat.class);
}
else if(ext.equals("xlsx")){
System.out.println("Excel File Format");
job.setInputFormatClass(ExcelInputFormat.class);
}
else{
System.out.println("normal Text File");
job.setInputFormatClass(TextInputFormat.class);
}
}
job.setJarByClass(Driver.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
//job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
// try {
// TextInputFormat.addInputPath(job, in);
// } catch (IOException e) {
// e.printStackTrace();
// }
TextOutputFormat.setOutputPath(job, out);
我正在努力实现上面的目标。但这并没有产生任何效果。请建议。
2条答案
按热度按时间zzzyeukh1#
对于Map器,所需的输入是一条记录(由值指示)。inputformat处理如何构造此记录并将其传递给map方法。
例如:默认输入格式textinputformat将文件中的一行视为记录。这通常适用于txt/xsv文件。
对于其他文件类型,更好的方法是使用自定义inputformat,它知道如何表示一条记录(在xml中,记录可以是一个子块)
如果已经为所有已标识的文件类型设置了inputformat类,则可以使用多个Input。
看看这里的javadochttps://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapreduce/lib/input/multipleinputs.html
可以使用文件系统api识别扩展
uplii1fm2#
从上下文中,获取输入拆分,然后从路径中获取路径和名称。
Context => getInputSplit() => getPath => getName()
一旦你知道了名字,找到lastIndexOf(".")
从索引中提取子字符串。现在在子字符串中有了扩展名并使用它进行比较。
编辑:
下面的方法对你可行吗?
为每种类型的扩展都有单独的Map器。
在driver类中添加以下行。