python 使用wholeTextFiles处理.txt文件&想要提取文件名

ct2axkht  于 2023-05-27  发布在  Python
关注(0)|答案(1)|浏览(126)

我在python spark中使用wholeTextFiles()阅读一个.txt文件。我知道在阅读wholeTextFiles()之后,得到的rdd将是format(filepath,content)。我有很多文件要读。我想从文件路径中剪切文件名并保存到spark dataframe中,文件名的一部分作为HDFS位置中的日期文件夹。但是在保存时,我没有得到相应的文件名。有没有办法做到这一点?下面是我的代码

base_data = sc.wholeTextFiles("/user/nikhil/raw_data/")

data1 = base_data.map(lambda x : x[0]).flatMap(lambda x : x.split('/')).filter(lambda x : x.startswith('CH'))

data2=data1.flatMap(lambda x : x.split('F_')).filter(lambda x : x.startswith('2'))

print(data1.collect())

print(data2.collect())

df.repartition(1).write.mode('overwrite').parquet(outputLoc + "/xxxxx/" + data2)

logdf = sqlContext.createDataFrame(
    [(data1, pstrt_time, pend_time, 'DeltaLoad Completed')],
    ["filename","process_start_time", "process_end_time", "status"])`

输出:

data1: ['CHNC_P0BCDNAF_20200217', 'CHNC_P0BCDNAF_20200227', 'CHNC_P0BCDNAF_20200615', 'CHNC_P0BCDNAF_20200925']

data2: ['20200217', '20200227', '20200615', '20200925']
rryofs0p

rryofs0p1#

这里有一个Scala版本,您可以轻松地将其转换为pyspark:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType 

val files = sc.wholeTextFiles("/FileStore/tables/*ZZ.txt",0) 
val res1 = files.map(line => (line._1, line._2.split("\n").flatMap(x => x.split(" ")) )).map(elem => {(elem._1, elem._2) })
val res2 = res1.flatMap {
  case (x, y) => {
    y.map(z => (x, z))
}}
val res3 = res2.map(line => (line._1, line._1.split("/")(3), line._2))
val df = res3.toDF()
val df2 = df.withColumn("s", split($"_1", "/"))
            .withColumn("f1", $"s"(3)) 
            .withColumn("f2", $"f1".cast(StringType)) // avoid issues with split subsequently
            .withColumn("filename", substring_index(col("f2"), ".", 1))
df2.show(false)
df2.repartition($"filename").write.mode("overwrite").parquet("my_parquet") // default 200 and add partitionBy as well for good measure on your `write`.

通过.drop或使用select剥离一些示例数据:

+--------------------------------+---------+-------+-------------------------------------+---------+---------+--------+
|_1                              |_2       |_3     |s                                    |f1       |f2       |filename|
+--------------------------------+---------+-------+-------------------------------------+---------+---------+--------+
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|wwww   |[dbfs:, FileStore, tables, AAAZZ.txt]|AAAZZ.txt|AAAZZ.txt|AAAZZ   |
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|wwww   |[dbfs:, FileStore, tables, AAAZZ.txt]|AAAZZ.txt|AAAZZ.txt|AAAZZ   |
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|rrr    |[dbfs:, FileStore, tables, AAAZZ.txt]|AAAZZ.txt|AAAZZ.txt|AAAZZ   |
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|       |[dbfs:, FileStore, tables, AAAZZ.txt]|AAAZZ.txt|AAAZZ.txt|AAAZZ   |
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|4445

...

常用的标点符号删除、空格修剪等方面。你需要适应你的文件名的情况,当然,我看不出来。
问题是你不能在一个已经分裂的东西上分裂。

相关问题