scala 如何用spark sc.textFile获取文件名?

fnvucqvd  于 2022-12-13  发布在  Scala
关注(0)|答案(2)|浏览(204)

我正在使用以下代码阅读文件目录:

val data = sc.textFile("/mySource/dir1/*")

现在我的data rdd包含目录中所有文件的所有行(对吗?)
现在我想用源文件名为每行添加一列,该怎么做呢?
我尝试的其他选项是使用整个文本文件,但我一直得到内存异常。5个服务器24个核心24 GB(执行核心5执行内存5G)有什么想法?

yqkkidmi

yqkkidmi1#

你可以使用这个代码。我已经用Spark 1.4和1.5测试过了。
它从inputSplit获取文件名,并使用NewHadoopRDDmapPartitionsWithInputSplit将其添加到每一行

import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.{NewHadoopRDD}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text

val sc = new SparkContext(new SparkConf().setMaster("local"))

val fc = classOf[TextInputFormat]
val kc = classOf[LongWritable]
val vc = classOf[Text]

val path :String = "file:///home/user/test"
val text = sc.newAPIHadoopFile(path, fc ,kc, vc, sc.hadoopConfiguration)

val linesWithFileNames = text.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
           .mapPartitionsWithInputSplit((inputSplit, iterator) => {
  val file = inputSplit.asInstanceOf[FileSplit]
  iterator.map(tup => (file.getPath, tup._2))
  }
)

linesWithFileNames.foreach(println)
wydwbb8l

wydwbb8l2#

我认为现在回答这个问题已经很晚了,但我找到了一个简单的方法来完成你所寻找的:

  • 步骤0:from pyspark.sql import functions as F
  • 第1步:像往常一样使用RDD创建DataFrame。
  • 步骤2:使用input_file_name()
df.withColumn("INPUT_FILE", F.input_file_name())

这将向您的DataFrame添加一个列,其源文件名为。

相关问题