为什么spark应用程序将Dataframe保存到带有多个csv文件的s3存储桶中

a0x5cqrl  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(489)

嗨,我是spark和amazon电子病历集群的新手。
我试图编写一个可以在amazonemr集群上运行的演示spark应用程序。当代码在zeppelin笔记本上运行时,它会返回输出,我认为输出将保存为amazon emr集群上的单个文件,如下所示:

%pyspark
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
if type(df.c) == type(upper(df.c)) == type(df.c.isNull()):
    df_new = df.withColumn('upper_c', upper(df.c))
df_new

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+

spark应用程序:

from pyspark.sql import SparkSession
from pyspark.sql import Column
from pyspark.sql.functions import upper
from datetime import datetime, date
import argparse

def pre_processing(output_uri):

    spark =  SparkSession.builder.appName("process sample data").getOrCreate()
    rdd = spark.sparkContext.parallelize([
        (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
        (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
        (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
    ])
    df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
    spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
    if type(df.c) == type(upper(df.c)) == type(df.c.isNull()):
        df_new = df.withColumn('upper_c', upper(df.c))
    df_new
    df_new.write.option("header", "true").mode("overwrite").csv(output_uri)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--output_uri', help="The URI where output is saved")
    args = parser.parse_args()
    pre_processing(args.output_uri)

不过,当我在集群上运行spark应用程序时,它会将多个csv文件保存到s3 bucket中。我想知道为什么我的spark应用程序会将Dataframe保存到s3存储桶上的多个文件中。
spark应用程序的参数如下:

spark-submit --deploy-mode cluster s3://<BUCKET_NAME>/spark_application/emr_demo_app.py --output_uri s3://<BUCKET_NAME>/output

提前谢谢。
ps:有一次,我按照aws emr教程的下一页,和样本应用程序保存为一个单一的csv文件。

8tntrjer

8tntrjer1#

spark使用分区的概念,以便在工作者之间并行化任务。Dataframe也被分区,当调用save操作时,每个worker将保存Dataframe的一部分,从而创建多个文件。
为了创建单个文件,只需 repartition 或者 coalesce 将Dataframe划分为一个分区:

df_new.repartition(1).write.option("header", "true").mode("overwrite").csv(output_uri)

所有的数据都被发送到一个worker,然后worker将记录保存到一个文件中。如果数据集太大,就会出现瓶颈问题。这里有一个类似的答案:使用sparkcsv编写单个csv文件

相关问题