我在python spark中使用wholeTextFiles()
阅读一个.txt文件。我知道在阅读wholeTextFiles()
之后,得到的rdd将是format(filepath,content)。我有很多文件要读。我想从文件路径中剪切文件名并保存到spark dataframe中,文件名的一部分作为HDFS位置中的日期文件夹。但是在保存时,我没有得到相应的文件名。有没有办法做到这一点?下面是我的代码
base_data = sc.wholeTextFiles("/user/nikhil/raw_data/")
data1 = base_data.map(lambda x : x[0]).flatMap(lambda x : x.split('/')).filter(lambda x : x.startswith('CH'))
data2=data1.flatMap(lambda x : x.split('F_')).filter(lambda x : x.startswith('2'))
print(data1.collect())
print(data2.collect())
df.repartition(1).write.mode('overwrite').parquet(outputLoc + "/xxxxx/" + data2)
logdf = sqlContext.createDataFrame(
[(data1, pstrt_time, pend_time, 'DeltaLoad Completed')],
["filename","process_start_time", "process_end_time", "status"])`
输出:
data1: ['CHNC_P0BCDNAF_20200217', 'CHNC_P0BCDNAF_20200227', 'CHNC_P0BCDNAF_20200615', 'CHNC_P0BCDNAF_20200925']
data2: ['20200217', '20200227', '20200615', '20200925']
1条答案
按热度按时间rryofs0p1#
这里有一个Scala版本,您可以轻松地将其转换为pyspark:
通过
.drop
或使用select
剥离一些示例数据:常用的标点符号删除、空格修剪等方面。你需要适应你的文件名的情况,当然,我看不出来。
问题是你不能在一个已经分裂的东西上分裂。