import org.apache.spark.sql.functions.{col, date_sub}
// Calculate the date 14 days ago
val cutoff_date = date_sub(current_date(), 14)
// Read data from directories for the last 2 weeks
val data = spark.read.parquet("/path/to/data")
.filter(col("batch_date") >= date_format(cutoff_date, "yyyy-MM-dd"))
val hdfs = new Path(path).getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
val filesToRead = hdfs.listStatus(new Path(path)).toList.filter(_.getPath.getName.split("=")(1) >= min_date)
2条答案
按热度按时间gpnt7bae1#
Spark旨在高效地读取分区数据。在阅读分区数据时,Spark只读取执行所需操作所需的文件和分区,避免读取整个数据集。
为了在Spark中有效地读取分区数据,必须在阅读数据时指定分区结构。
在你的例子中,分区是“batch_date”,所以要读取前14天的数据,你只需要这样做:
olmpazwi2#
你已经在做的是最佳的,因为apache spark中的PartitionFilters的概念,所以当你在分区列上应用过滤器时,这些过滤器会在通过网络发送任何数据之前应用于源数据,以减少传输的数据量。
例如,我有一些按年份分区的数据:
如果我应用以下代码:
我将得到以下物理计划:
如果你搜索PartitionFilters,你会发现:
这意味着应用了分区过滤器,并且只加载所需的分区,但是如果您没有看到PartitionFilters,则意味着出现了错误,整个数据将被加载
如果由于某种原因PartitionFilters不起作用,那么您可以始终使用Hadoop来过滤要使用spark加载的文件夹
然后使用spark读取filesToRead。