apachespark中的scala转换和操作

omqzjyyz  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(372)

我有一个scala代码,它使用通配符从hdfs中获取多个输入文件,每个文件进入一个函数,在这个函数中分别对每个文件进行处理。

import de.l3s.boilerpipe.extractors.KeepEverythingExtractor

    val data = sc.wholeTextFiles("hdfs://localhost:port/akshat/folder/*/*")

    val files = data.map { case (filename, content) => filename}

    def doSomething(file: String): (String,String) = { 

     // logic of processing a single file comes here

     val logData = sc.textFile(file);
     val c = logData.toLocalIterator.mkString
     val d = KeepEverythingExtractor.INSTANCE.getText(c)
     val e = sc.parallelize(d.split("\n"))
     val recipeName = e.take(10).last
     val prepTime = e.take(18).last

     (recipeName,prepTime)
    }

    //How transformation and action applied here?

我被困在如何应用进一步的转换和操作,以便根据函数dosomethingMap所有输入文件,并使用saveastextfile将每个输入文件的所有输出存储在单个文件中。

yx2lnoni

yx2lnoni1#

因此,如果我的理解是正确的,那么您有一个rdd对,您希望对它进行更多的转换,然后将每个键的输出保存到一个唯一的文件中。再转化一些相对容易, mapValue 将允许您仅对值编写转换,其他任何转换都将对成对的RDD工作。
但是,将每个键的输出保存到一个唯一的文件有点棘手。一个选择是尝试找到一个hadoopoutput格式,该格式满足您的需要,然后使用它 saveAsHadoopFile ,另一种选择是 foreach 然后只需编写代码即可根据需要输出每个键/值对。

相关问题