scala 如何根据日期范围过滤Parquet地板分区?

44u64gxh  于 2023-04-06  发布在  Scala
关注(0)|答案(2)|浏览(199)

我已经分割了 parquet 数据:

dir/batch_date=2023-02-13/batch_hour=09

我必须通过spark程序读取最近14天的数据。目前,我正在阅读数据并在dataframe上应用日期过滤器batch_date减去14天。有没有办法将目录范围限制为仅读取最近14天的目录,而不是整个数据集。
谢谢

gpnt7bae

gpnt7bae1#

Spark旨在高效地读取分区数据。在阅读分区数据时,Spark只读取执行所需操作所需的文件和分区,避免读取整个数据集。
为了在Spark中有效地读取分区数据,必须在阅读数据时指定分区结构。
在你的例子中,分区是“batch_date”,所以要读取前14天的数据,你只需要这样做:

import org.apache.spark.sql.functions.{col, date_sub}

// Calculate the date 14 days ago
val cutoff_date = date_sub(current_date(), 14)

// Read data from directories for the last 2 weeks
val data = spark.read.parquet("/path/to/data")
  .filter(col("batch_date") >= date_format(cutoff_date, "yyyy-MM-dd"))
olmpazwi

olmpazwi2#

你已经在做的是最佳的,因为apache spark中的PartitionFilters的概念,所以当你在分区列上应用过滤器时,这些过滤器会在通过网络发送任何数据之前应用于源数据,以减少传输的数据量。
例如,我有一些按年份分区的数据:

/path/
   Year=2018/
       file.parquet
   Year=2019/
       file.parquet
   Year=2020/
       file.parquet
   Year=2021/
       file.parquet
   Year=2022/
       file.parquet
   Year=2023/
       file.parquet

如果我应用以下代码:

spark.read.parquet("/path/").filter(col("Year") >= "2020").explain()

我将得到以下物理计划:

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [Variable_name#0,Value#1,Units#2,Year#3] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/user/out..., PartitionFilters: [isnotnull(Year#3), (Year#3 >= 2020)], PushedFilters: [], ReadSchema: struct<Variable_name:string,Value:string,Units:string>

如果你搜索PartitionFilters,你会发现:

PartitionFilters: [isnotnull(Year#3), (Year#3 >= 2020)]

这意味着应用了分区过滤器,并且只加载所需的分区,但是如果您没有看到PartitionFilters,则意味着出现了错误,整个数据将被加载
如果由于某种原因PartitionFilters不起作用,那么您可以始终使用Hadoop来过滤要使用spark加载的文件夹

val hdfs = new Path(path).getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
val filesToRead = hdfs.listStatus(new Path(path)).toList.filter(_.getPath.getName.split("=")(1) >= min_date)

然后使用spark读取filesToRead。

相关问题