优化从s3 bucket中的分区Parquet文件读取

yx2lnoni  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(559)

我有一个Parquet格式的大数据集(大小约1tb),分为2个层次: CLASS 以及 DATE 只有7节课。但从2020年1月1日开始,这个日期一直在增加。我的数据按 CLASS 先是然后 DATE 比如:

CLASS1---DATE 1
      ---DATE 2
      ---  .
      ---  .
      ---  .
      ---DATE N

CLASS2---DATE 1
      ---DATE 2
      ---  .
      ---  .
      ---  .
      ---DATE N

我加载我的数据 CLASS 在for循环中。如果我加载整个parquet文件,warn会终止作业,因为它会重载内存示例。但是自从我在我的模型中做百分位计算以来,我每天都在加载。这个方法大约需要23小时才能完成。
但是,如果我重新划分,使我只有 CLASS 分区,这项工作大约需要10个小时。子分区过多是否会降低spark执行器作业的速度?我将分区层次结构保持为 CLASS -> DATE 只是因为我需要在 DATE 每一天。如果只有一个分区更有效,那么我就必须重新分区到 CLASS 每天加载新数据后分区。有人能解释一下为什么只有一个分区工作得更快吗?如果是这样的话,最好的方法是每天通过追加数据而不重新划分整个数据集来划分数据?
谢谢您
edit:我使用文件结构上的for循环来循环 CLASS 像这样划分:

fs = s3fs.S3FileSystem(anon=False)    
inpath="s3://bucket/file.parquet/"

Dirs= fs.ls(inpath)
for paths in Dirs:
    customPath='s3://' + uvapath + '/'
    class=uvapath.split('=')[1]
    df=spark.read.parquet(customPath)
    outpath="s3://bucket/Output_" + class + ".parquet"

# Perform calculations

df.write.mode('overwrite').parquet(outpath)

上膛的 df 会有所有的约会日期 CLASS=1 . 然后,我将该文件作为单独的Parquet文件输出 CLASS 因此我有7个Parquet文件:

Output_1.parquet
Output_2.parquet
Output_3.parquet
Output_4.parquet
Output_5.parquet
Output_6.parquet
Output_7.parquet

然后我合并成一个单一的Parquet7Parquet不是一个问题,因为结果Parquet文件小得多。

piah890a

piah890a1#

我的分区数据有三列:年、月和id

year=2020/month=08/id=1/*.parquet
year=2020/month=08/id=2/*.parquet
year=2020/month=08/id=3/*.parquet
...
year=2020/month=09/id=1/*.parquet
year=2020/month=09/id=2/*.parquet
year=2020/month=09/id=3/*.parquet

我可以通过加载根路径来读取Dataframe。

val df = spark.read.parquet("s3://mybucket/")

然后,分区列会自动添加到Dataframe中。现在,您可以通过以下方式为分区列筛选数据

val df_filtered = df.filter("year = '2020' and month = '09'")

做点什么 df_filtered 那么spark将只使用分区数据!
对于重复处理,可以使用 fair scheduler Spark的Spark。使用以下代码将fair.xml文件添加到项目的src/main/resources中,

<?xml version="1.0"?>

<allocations>
    <pool name="fair">
        <schedulingMode>FAIR</schedulingMode>
        <weight>10</weight>
        <minShare>0</minShare>
    </pool>
</allocations>

并在创建spark会话后设置spark配置。

spark.sparkContext.setLocalProperty("spark.scheduler.mode", "FAIR")
spark.sparkContext.setLocalProperty("spark.scheduler.allocation.file", getClass.getResource("/fair.xml").getPath)
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "fair")

然后你就可以同时做你的工作了。你可能想并行作业取决于类,所以

val classes = (1 to 7).par
val date = '2020-09-25'

classes foreach { case i =>

    val df_filtered = df.filter(s"CLASS == '$i' and DATE = '$date'")

    // Do your job

}

代码将同时使用不同的类值。

相关问题