我正在使用以下代码阅读文件目录:
val data = sc.textFile("/mySource/dir1/*")
现在我的data rdd包含目录中所有文件的所有行(对吗?)现在我想用源文件名为每行添加一列,该怎么做呢?我尝试的其他选项是使用整个文本文件,但我一直得到内存异常。5个服务器24个核心24 GB(执行核心5执行内存5G)有什么想法?
data
yqkkidmi1#
你可以使用这个代码。我已经用Spark 1.4和1.5测试过了。它从inputSplit获取文件名,并使用NewHadoopRDD的mapPartitionsWithInputSplit将其添加到每一行
inputSplit
NewHadoopRDD
mapPartitionsWithInputSplit
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat} import org.apache.spark.rdd.{NewHadoopRDD} import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text val sc = new SparkContext(new SparkConf().setMaster("local")) val fc = classOf[TextInputFormat] val kc = classOf[LongWritable] val vc = classOf[Text] val path :String = "file:///home/user/test" val text = sc.newAPIHadoopFile(path, fc ,kc, vc, sc.hadoopConfiguration) val linesWithFileNames = text.asInstanceOf[NewHadoopRDD[LongWritable, Text]] .mapPartitionsWithInputSplit((inputSplit, iterator) => { val file = inputSplit.asInstanceOf[FileSplit] iterator.map(tup => (file.getPath, tup._2)) } ) linesWithFileNames.foreach(println)
wydwbb8l2#
我认为现在回答这个问题已经很晚了,但我找到了一个简单的方法来完成你所寻找的:
from pyspark.sql import functions as F
input_file_name()
df.withColumn("INPUT_FILE", F.input_file_name())
这将向您的DataFrame添加一个列,其源文件名为。
DataFrame
2条答案
按热度按时间yqkkidmi1#
你可以使用这个代码。我已经用Spark 1.4和1.5测试过了。
它从
inputSplit
获取文件名,并使用NewHadoopRDD
的mapPartitionsWithInputSplit
将其添加到每一行wydwbb8l2#
我认为现在回答这个问题已经很晚了,但我找到了一个简单的方法来完成你所寻找的:
from pyspark.sql import functions as F
input_file_name()
这将向您的
DataFrame
添加一个列,其源文件名为。