我有一个需求,我需要从ftp服务器下载某些.zip文件,并将归档文件的内容(内容是一些xml文件)推送到hdfs(hadoop分布式文件系统)。因此,到目前为止,我正在使用acpacheftpclient连接到ftp服务器,并首先将文件下载到本地机器上。稍后解压相同的文件并将文件夹路径提供给一个方法,该方法将迭代本地文件夹并将文件推送到hdfs。为了便于理解,我还在下面附加了一些代码片段。
//Gives me an active FTPClient
FTPClient ftpCilent = getActiveFTPConnection();
ftpCilent.changeWorkingDirectory(remoteDirectory);
FTPFile[] ftpFiles = ftpCilent.listFiles();
if(ftpFiles.length <= 0){
logger.info("Unable to find any files in given location!!");
return;
}
//Iterate files
for(FTPFile eachFTPFile : ftpFiles){
String ftpFileName = eachFTPFile.getName();
//Skips files if not .zip files
if(!ftpFileName.endsWith(".zip")){
continue;
}
System.out.println("Reading File -->" + ftpFileName);
/*
* location is the path on local system given by user
* usually loaded by a property file.
*
* Create a archiveLocation where archived files are
* downloaded from FTP.
*/
String archiveFileLocation = location + File.separator + ftpFileName;
String localDirName = ftpFileName.replaceAll(".zip", "");
/*
* localDirLocation is the location where a folder is created
* by the name of the archive in the FTP and the files are copied to
* respective folders.
*
*/
String localDirLocation = location + File.separator + localDirName;
File localDir = new File(localDirLocation);
localDir.mkdir();
File archiveFile = new File(archiveFileLocation);
FileOutputStream archiveFileOutputStream = new FileOutputStream(archiveFile);
ftpCilent.retrieveFile(ftpFileName, archiveFileOutputStream);
archiveFileOutputStream.close();
//Delete the archive file after coping it's contents
FileUtils.forceDeleteOnExit(archiveFile);
//Read the archive file from archiveFileLocation.
ZipFile zip = new ZipFile(archiveFileLocation);
Enumeration entries = zip.entries();
while(entries.hasMoreElements()){
ZipEntry entry = (ZipEntry)entries.nextElement();
if(entry.isDirectory()){
logger.info("Extracting directory " + entry.getName());
(new File(entry.getName())).mkdir();
continue;
}
logger.info("Extracting File: " + entry.getName());
IOUtils.copy(zip.getInputStream(entry), new FileOutputStream(
localDir.getAbsolutePath() + File.separator + entry.getName()));
}
zip.close();
/*
* Iterates the folder location provided and load the files to HDFS
*/
loadFilesToHDFS(localDirLocation);
}
disconnectFTP();
现在,这种方法的问题是,应用程序需要花费大量时间将文件下载到本地路径,解压文件,然后将其加载到hdfs。有没有更好的方法可以动态地从ftp中提取zip的内容,并直接向该方法提供一个内容流 loadFilesToHDFS()
而不是通往本地系统的道路?
1条答案
按热度按时间x8goxv8g1#
使用压缩流。请看这里:http://www.oracle.com/technetwork/articles/java/compress-1565076.html
具体请参见这里的代码示例1。