我有大量的csv文件需要转换成Parquet文件,使用pyspark。一个csv是一个Parquet地板。
输入:csv文件:
000.csv
001.csv
002.csv
...
输出:qarquet文件:
000.parquet
001.parquet
002.parquet
...
我目前的解决方案是:
for each_csv in same_folder:
df = spark.read.csv(each_csv, header = True)
df.write.parquet(output_folder)
for循环很贵。有没有什么方法可以利用spark进行批处理?例如。
spark.read.csv(相同的文件夹/).write.parquet(输出文件夹/)
根据quicksilver的回答,以下是我的Pypark版本:
spark = SparkSession.builder.master("local[*]").appName("csv_to_parquet").getOrCreate()
# Read csv files into a single data frame and add a column of input file names:
baseDf = spark.read.csv("input_folder/*.csv").withColumn("input_file_name", input_file_name())
# Convert file names into a list:
filePathInfo = baseDf.select("input_file_name").distinct().collect()
filePathInfo_array = list(map(lambda row: row.input_file_name, filePathInfo))
# Write to parquet:
map(lambda csvFileName: baseDf.filter(col("input_file_name").endsWith(csvFileName)).write.mode('overwrite').parquet(f'output_folder/{csvFileName}'), filePathInfo_array)
2条答案
按热度按时间s4chpxco1#
您可以使用globbing模式来选择文件,也可以提供文件列表。
如果文件夹中有两个文件
/tmp/file1_csv/file1.csv
以及/tmp/file2_csv/file2.csv
,我可以使用以下或者,如果您有奇怪的路径,也可以使用重载版本的
csv
方法。gjmwrych2#
您可以按照以下步骤来避免spark中的多个文件加载:,
使用源csv文件夹加载Dataframe
列
input_file_name
哪个记录源文件名将文件名收集到一个列表中
遍历文件名列表
在文件名列表循环中,
按文件名筛选Dataframe
写入相应文件
scala中的sudo工作代码