将多个目录中的csv文件转换为pyspark中的parquet

nwlqm0z1  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(666)

我有来自多个路径的csv文件,这些路径不是s3 bucket中的父目录。所有表都有相同的分区键。
s3的目录:

table_name_1/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.csv
table_name_2/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.csv
...

我需要将这些csv文件转换成Parquet文件,并将它们存储在另一个具有相同目录结构的s3 bucket中。
另一个s3的目录:

table_name_1/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.parquet
table_name_2/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.parquet
...

我有一个解决方案是遍历s3 bucket并找到csv文件,然后将其转换为parquet并保存到另一个s3路径。我发现这种方法效率不高,因为我有一个循环,并且一个文件一个文件地进行转换。
我想利用spark库来提高效率。然后,我试着:

spark.read.csv('s3n://bucket_name/table_name_1/').write.partitionBy('partition_key_1', 'partition_key_2').parquet('s3n://another_bucket/table_name_1')

这种方法适用于每个表,但要对其进行更多优化,我希望将表名称作为参数,例如:

TABLE_NAMES = [table_name_1, table_name_2, ...]
spark.read.csv('s3n://bucket_name/{*TABLE_NAMES}/').write.partitionBy('partition_key_1', 'partition_key_2').parquet('s3n://another_bucket/{*TABLE_NAMES}')

谢谢

llew8vvj

llew8vvj1#

上述问题提供了同时读取多个文件的解决方案。方法 spark.read.csv(...) 接受一个或多个路径,如下所示。对于读取文件,可以应用相同的逻辑。不过,在编写时,spark会将所有给定的数据集/路径合并到一个Dataframe中。因此,如果不首先应用自定义逻辑,就不可能从一个Dataframe生成多个Dataframe。因此,没有一种方法可以将初始Dataframe直接提取到多个目录中,即 df.write.csv(*TABLE_NAMES) .
好消息是spark提供了一个专用函数input\ file\ u name(),它返回当前记录的文件路径。您可以将它与表名结合使用,以筛选表名。
这里有一个可能未经测试的Pypark解决方案:

from pyspark.sql.functions import input_file_name 

TABLE_NAMES = [table_name_1, table_name_2, ...]

source_path = "s3n://bucket_name/"
input_paths = [f"{source_path}/{t}" for t in TABLE_NAMES]

all_df = spark.read.csv(*input_paths) \
              .withColumn("file_name", input_file_name()) \
              .cache()

dest_path = "s3n://another_bucket/"

def write_table(table_name: string) -> None:
   all_df.where(all_df["file_name"].contains(table_name))
     .write
     .partitionBy('partition_key_1','partition_key_2')
     .parquet(f"{dest_path}/{table_name}")

for t in TABLE_NAMES:
   write_table(t)

说明:
我们生成输入路径并将其存储到 input_paths . 这将创建如下路径: s3n://bucket_name/table1, s3n://bucket_name/table2 ... s3n://bucket_name/tableN .
然后我们将所有路径加载到一个Dataframe中,在其中添加一个名为 file_name ,这将保留每行的路径。注意,我们还使用 cache 在这里,这很重要,因为我们有多个 len(TABLE_NAMES) 以下代码中的操作。使用缓存将阻止我们一次又一次地加载数据源。
接下来我们创建 write_table 它负责保存给定表的数据。下一步是使用 all_df["file_name"].contains(table_name) ,这将仅返回包含 table_namefile_name 列。最后,我们保存过滤后的数据。
在最后一步,我们呼吁 write_table 每一项 TABLE_NAMES .
相关链接
如何在一次加载中导入多个csv文件?
为序列文件格式的文件获取pyspark中的hdfs文件路径

相关问题