是否可以在pyspark中修改输出数据文件名?

erhoui1w  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(804)

简化的案例。
假设我在目录中有5个输入文件 data_directory :

data_2020-01-01.txt,
data_2020-01-02.txt,
data_2020-01-03.txt,
data_2020-01-04.txt,
data_2020-01-05.txt

我把它们都读给pyspark rdd,并对它们执行一些不做任何洗牌的操作。

spark = SparkSession.builder.appName("Clean Data").getOrCreate()
sparkContext = spark.sparkContext

input_rdd = sparkContext.textFile("data_directory/*")
result = input_rdd.mapPartitions(lambda x: remove_corrupted_rows(x))

现在我想保存数据:

result.saveAsTextFile(
    "results",
    compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec",
)

我得到了5个文件,每个文件都包含“part”。所以我丢失了输出文件来自哪个输入文件的信息:

._SUCCESS.crc
.part-00000.gz.crc
.part-00001.gz.crc
.part-00002.gz.crc
.part-00003.gz.crc
.part-00004.gz.crc
_SUCCESS
part-00000.gz
part-00001.gz
part-00002.gz
part-00003.gz
part-00004.gz

在这种情况下,有没有保留输入文件名或引入我自己的命名模式?
预期结果:

._SUCCESS.crc
.data_2020-01-01.gz.crc
.data_2020-01-02.gz.crc
.data_2020-01-03.gz.crc
.data_2020-01-04.gz.crc
.data_2020-01-05.crc
_SUCCESS
data_2020-01-01.gz
data_2020-01-02.gz
data_2020-01-03.gz
data_2020-01-04.gz
data_2020-01-05.gz
3z6pesqy

3z6pesqy1#

你可以用 pyspark.sql.functions.input_file_name() (此处为文档)https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=functions#pyspark.sql.functions.input_file_name)然后按创建的列对Dataframe进行分区。
这样,5个输入文件应该给您一个包含5个不同值的分类列,对它进行分区应该将您的输出分成5个部分。
或者,如果您希望有一个完整的命名模式,那么可以在 input_file_name() 列(此处为5个Dataframe),重新分区(例如,使用 coalesce(1) )然后使用自定义逻辑(例如dictMap或从列中提取文件名并解析为 DataFrameWriter.csv() 作为名称)。
n、 b:当更改为1个分区时,请确保所有数据都适合您的内存!

相关问题