我有一个scala代码,它使用通配符从hdfs中获取多个输入文件,每个文件进入一个函数,在这个函数中分别对每个文件进行处理。
import de.l3s.boilerpipe.extractors.KeepEverythingExtractor
val data = sc.wholeTextFiles("hdfs://localhost:port/akshat/folder/*/*")
val files = data.map { case (filename, content) => filename}
def doSomething(file: String): (String,String) = {
// logic of processing a single file comes here
val logData = sc.textFile(file);
val c = logData.toLocalIterator.mkString
val d = KeepEverythingExtractor.INSTANCE.getText(c)
val e = sc.parallelize(d.split("\n"))
val recipeName = e.take(10).last
val prepTime = e.take(18).last
(recipeName,prepTime)
}
//How transformation and action applied here?
我被困在如何应用进一步的转换和操作,以便根据函数dosomethingMap所有输入文件,并使用saveastextfile将每个输入文件的所有输出存储在单个文件中。
1条答案
按热度按时间yx2lnoni1#
因此,如果我的理解是正确的,那么您有一个rdd对,您希望对它进行更多的转换,然后将每个键的输出保存到一个唯一的文件中。再转化一些相对容易,
mapValue
将允许您仅对值编写转换,其他任何转换都将对成对的RDD工作。但是,将每个键的输出保存到一个唯一的文件有点棘手。一个选择是尝试找到一个hadoopoutput格式,该格式满足您的需要,然后使用它
saveAsHadoopFile
,另一种选择是foreach
然后只需编写代码即可根据需要输出每个键/值对。