从文件系统有条件加载分区

3zwtqj6y  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(363)

我知道在pysparks中有关于通配符的问题 .load() -像这里或这里这样运作。不管怎样,我找到的问题/答案中没有一个涉及到我的变化。

上下文

在pyspark中,我想直接从hdfs加载文件,因为我必须使用databricks avro library for spark 2.3.x。我是这样做的:

partition_stamp = "202104"

df = spark.read.format("com.databricks.spark.avro") \
        .load(f"/path/partition={partition_stamp}*") \
        .select("...")

如您所见,分区是从格式中的时间戳派生的 yyyyMMdd .

问题

目前我只得到2021年4月使用的所有分区( partition_stamp = "202104" ). 但是,我需要从2021年4月开始的所有分区。
用伪代码编写,我需要一个类似的解决方案:

.load(f"/path/partition >= {partition_stamp}*")

因为实际上有几百个分区,所以以任何需要硬编码的方式来做是没有用的。
所以我的问题是:是否有一个函数用于条件文件加载?

8e2ybdfx

8e2ybdfx1#

据我所知,只有以下选项可以动态处理 .load() -功能:


* :  Wildcard for any character or sequence of characters until the end of the line or a new sub-directory ('/') -> (/path/20200*)

[1-3]: Regex-like inclusion of a defined character-range -> (/path/20200[1-3]/...)
{1,2,3}: Set-like inclusion of a defined set of characters -> (/path/20200{1,2,3}/...)

因此,回答我的问题:没有用于条件文件加载的内置函数。
不管怎样,我想为您提供我的解决方案:

import pandas as pd # Utilize pandas date-functions

partition_stamp = ",".join((set(
                        str(_range.year) + "{:02}".format(_range.month) 
                        for _range in pd.date_range(start=start_date, end=end_date, freq='D')
                 )))

df = spark.read.format("com.databricks.spark.avro") \
        .load(f"/path/partition={{{partition_stamp}}}*") \
        .select("...")

这样就减少了对时间戳格式的限制 yyyyMM 为给定的开始和结束日期以及基于字符串的 .load() 仍然可用。

相关问题