在spark scala中从dataframe列中的数组添加文件名

ac1kyiln  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(437)
val hadoopConf = new Configuration()

val fs = FileSystem.get(hadoopConf)

val status = fs.listStatus(new Path("/home/Test/")).map(_.getPath().toString)

val df =  spark.read.format("json").load(status : _*)

如何在df的新列中添加文件名?
我试过:

val dfWithCol = df.withColumn("filename",input_file_name())

但它在所有列中添加相同的文件名?有人能提出更好的方法吗?

ljo96ir5

ljo96ir51#

这是预期的行为,因为您的json文件 more than one record 在里面。
spark增加了 filenames 对于每个记录,如果要检查所有唯一的文件名,请执行以下操作 distinct 在文件名列上

//to get unique filenames
df.select("filename").distinct().show()
``` `Example:` ```

# source data

hadoop fs -cat /user/shu/json/*.json
{"id":1,"name":"a"}
{"id":1,"name":"a"}
val hadoopConf = new Configuration()

val fs = FileSystem.get(hadoopConf)

val status = fs.listStatus(new Path("/user/shu/json")).map(_.getPath().toString)

val df =  spark.read.format("json").load(status : _*)

df.withColumn("filename",input_file_name()).show(false)

//unique filenames for each record
+---+----+----------------------------------------------------------------------------+
|id |name|input                                                                       |
+---+----+----------------------------------------------------------------------------+
|1  |a   |hdfs://nn:8020/user/shu/json/i.json                                         |
|1  |a   |hdfs://nn:8020/user/shu/json/i1.json                                        |
+---+----+----------------------------------------------------------------------------+

在上面的例子中你可以看到 unique filenames 对于每条记录 (as i have 1 record in each json file) .

相关问题