如何使用spark读取文件夹文件?

nszi6y05  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(977)

我有一个hdfs文件夹,在这个文件夹里有很多txt文件。我想用spark读取这些文件中的内容。
我的代码:

// Create spark session
    val spark = SparkSession.builder()
                  .master("spark://master:7077")
                  .appName("Indexing data to elasticsearch")
                  .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    // Read folder file
    val df:DataFrame = spark.read.text("hdfs://master:9000/user/file/shakespeare")

我想从dataframe获取文件夹中每个文件的内容。我该怎么办?

b1zrtrql

b1zrtrql1#

这是RDD似乎比Dataframe更灵活的许多情况之一,因为RDD提供了wholetextfiles方法。
wholetextfiles方法基本上类似于textfile,但它不是读取所有文件中每行的输入,而是读取并存储每个文件的record/pairdd/key-value对。结果rdd的模式如下:

(path_to_file, file_contents)

(但是,使用此方法时必须非常小心,因为您可能无法预测目录下文件内容的长度,因此这可能会导致内存不足)。
假设我们有很多文本文件(a.txt,b.txt,…),每一个文件都以字母命名,它们的内容如下所示:
使用wholetextfiles方法将产生以下rdd对(您可能希望将每个文件的完整路径作为每一对的密钥以获得更好的可读性):

(hdfs:/.../.../a.txt,a aa aaa aaaa aaaaa)
(hdfs:/.../.../b.txt,b bb bbb bbbb bbbbb)
(hdfs:/.../.../c.txt,c cc ccc cccc ccccc)
(hdfs:/.../.../d.txt,d dd ddd dddd ddddd)
(hdfs:/.../.../e.txt,e ee eee eeee eeeee)

使用此方法后,根据您的问题剩下的所有工作就是将结果rdd转换为dataframe,因为每个记录的数据将分为两列(由您命名):file\u name和content。

+---------+-------------------+
|file_name|            content|
+---------+-------------------+
|    a.txt|a aa aaa aaaa aaaaa|
|    b.txt|b bb bbb bbbb bbbbb|
|    c.txt|c cc ccc cccc ccccc|
|    d.txt|d dd ddd dddd ddddd|
|    e.txt|e ee eee eeee eeeee|
+---------+-------------------+

下面的代码片段可能会发生这种情况

// create a scala spark context to use the wholeTextFiles method
val sc = spark.sparkContext

// create an RDD where the full path of each file is the key 
// and the file's content is the value,
// and get rid of the full path of the file
val input = sc.wholeTextFiles("hdfs://path/to/folder/*")
.map(file => (file._1.split('/').last, file._2))

// convert the RDD to a DataFrame and explicitly name the columns
val input_df = spark.createDataFrame(input).toDF("file_name", "content")

相关问题