我有一个包含n
个文件的文件夹。
我正在创建一个RDD,其中包含上述文件夹的所有文件名,代码如下:
fnameRDD = spark.read.text(filepath).select(input_file_name()).distinct().rdd)
我想迭代这些RDD
元素并执行以下步骤:
1.读取每个元素的内容(每个元素是一个文件路径,因此需要通过SparkContext读取内容)
1.上面的内容应该是另一个RDD,我想将其作为参数传递给函数
1.对作为参数传递到被调用函数中的RDD执行某些步骤
我已经写了一个函数,它有步骤,我已经测试了单个文件,它的工作很好,但我已经尝试了各种事情的语法做前2个步骤,但我只是得到无效的语法每一次。
我知道我不应该使用map()
,因为我想在每次迭代中读取一个文件,这将需要sc
,但map
将在工作节点内执行,在那里不能引用sc
。
另外,我知道我可以使用wholeTextFiles()
作为替代方案,但这意味着在整个过程中,我将在内存中保存所有文件的文本,这对我来说似乎效率不高。
我也愿意听取关于不同做法的建议。
2条答案
按热度按时间ff29svar1#
可能还有其他更有效的方法,但假设你已经有了一个函数
SomeFunction(df: DataFrame[value: string])
,最简单的方法是在你的fnameRDD
上使用toLocalIterator()
,一次处理一个文件。rkkpypqq2#
我相信你在寻找递归文件查找,
spark.read.option("recursiveFileLookup", "true").text(filepathroot)
如果你把它指向你的文件的根目录,spark将遍历目录,并拾取根和子文件夹下的所有文件,这将把文件读入单个 Dataframe