Apache Spark 遍历RDD元素,读取其内容以供进一步处理

sqxo8psd  于 2023-01-21  发布在  Apache
关注(0)|答案(2)|浏览(153)

我有一个包含n个文件的文件夹。
我正在创建一个RDD,其中包含上述文件夹的所有文件名,代码如下:

fnameRDD = spark.read.text(filepath).select(input_file_name()).distinct().rdd)

我想迭代这些RDD元素并执行以下步骤:
1.读取每个元素的内容(每个元素是一个文件路径,因此需要通过SparkContext读取内容)
1.上面的内容应该是另一个RDD,我想将其作为参数传递给函数
1.对作为参数传递到被调用函数中的RDD执行某些步骤
我已经写了一个函数,它有步骤,我已经测试了单个文件,它的工作很好,但我已经尝试了各种事情的语法做前2个步骤,但我只是得到无效的语法每一次。
我知道我不应该使用map(),因为我想在每次迭代中读取一个文件,这将需要sc,但map将在工作节点内执行,在那里不能引用sc
另外,我知道我可以使用wholeTextFiles()作为替代方案,但这意味着在整个过程中,我将在内存中保存所有文件的文本,这对我来说似乎效率不高。
我也愿意听取关于不同做法的建议。

ff29svar

ff29svar1#

可能还有其他更有效的方法,但假设你已经有了一个函数SomeFunction(df: DataFrame[value: string]),最简单的方法是在你的fnameRDD上使用toLocalIterator(),一次处理一个文件。

for x in fnameRDD.toLocalIterator():
  fileContent = spark.read.text(x[0])
# fileContent.show(truncate=False)
  SomeFunction(fileContent)
rkkpypqq

rkkpypqq2#

我相信你在寻找递归文件查找,
spark.read.option("recursiveFileLookup", "true").text(filepathroot)
如果你把它指向你的文件的根目录,spark将遍历目录,并拾取根和子文件夹下的所有文件,这将把文件读入单个 Dataframe

相关问题