我有来自多个路径的csv文件,这些路径不是s3 bucket中的父目录。所有表都有相同的分区键。
s3的目录:
table_name_1/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.csv
table_name_2/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.csv
...
我需要将这些csv文件转换成Parquet文件,并将它们存储在另一个具有相同目录结构的s3 bucket中。
另一个s3的目录:
table_name_1/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.parquet
table_name_2/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.parquet
...
我有一个解决方案是遍历s3 bucket并找到csv文件,然后将其转换为parquet并保存到另一个s3路径。我发现这种方法效率不高,因为我有一个循环,并且一个文件一个文件地进行转换。
我想利用spark库来提高效率。然后,我试着:
spark.read.csv('s3n://bucket_name/table_name_1/').write.partitionBy('partition_key_1', 'partition_key_2').parquet('s3n://another_bucket/table_name_1')
这种方法适用于每个表,但要对其进行更多优化,我希望将表名称作为参数,例如:
TABLE_NAMES = [table_name_1, table_name_2, ...]
spark.read.csv('s3n://bucket_name/{*TABLE_NAMES}/').write.partitionBy('partition_key_1', 'partition_key_2').parquet('s3n://another_bucket/{*TABLE_NAMES}')
谢谢
1条答案
按热度按时间llew8vvj1#
上述问题提供了同时读取多个文件的解决方案。方法
spark.read.csv(...)
接受一个或多个路径,如下所示。对于读取文件,可以应用相同的逻辑。不过,在编写时,spark会将所有给定的数据集/路径合并到一个Dataframe中。因此,如果不首先应用自定义逻辑,就不可能从一个Dataframe生成多个Dataframe。因此,没有一种方法可以将初始Dataframe直接提取到多个目录中,即df.write.csv(*TABLE_NAMES)
.好消息是spark提供了一个专用函数input\ file\ u name(),它返回当前记录的文件路径。您可以将它与表名结合使用,以筛选表名。
这里有一个可能未经测试的Pypark解决方案:
说明:
我们生成输入路径并将其存储到
input_paths
. 这将创建如下路径:s3n://bucket_name/table1, s3n://bucket_name/table2 ... s3n://bucket_name/tableN
.然后我们将所有路径加载到一个Dataframe中,在其中添加一个名为
file_name
,这将保留每行的路径。注意,我们还使用cache
在这里,这很重要,因为我们有多个len(TABLE_NAMES)
以下代码中的操作。使用缓存将阻止我们一次又一次地加载数据源。接下来我们创建
write_table
它负责保存给定表的数据。下一步是使用all_df["file_name"].contains(table_name)
,这将仅返回包含table_name
在file_name
列。最后,我们保存过滤后的数据。在最后一步,我们呼吁
write_table
每一项TABLE_NAMES
.相关链接
如何在一次加载中导入多个csv文件?
为序列文件格式的文件获取pyspark中的hdfs文件路径