spark查找日期分区列的最大值

hec6srdp  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(693)

我有一个Parquet地板按以下方式分割:

data
/batch_date=2020-01-20
/batch_date=2020-01-21
/batch_date=2020-01-22
/batch_date=2020-01-23
/batch_date=2020-01-24

这里batch\u date是分区列的日期类型。
我只想从最新的日期分区读取数据,但作为一个消费者,我不知道最新的值是多少。
我可以用一个简单的小组

df.groupby().agg(max(col('batch_date'))).first()

虽然这会起作用,但这是一种非常低效的方式,因为它涉及到groupby。
我想知道我们是否可以更有效地查询最新的分区。
谢谢。

kkbh8khc

kkbh8khc1#

函数“max”可以在没有“groupby”的情况下使用:

df.select(max("batch_date"))
jei2mxaa

jei2mxaa2#

使用show partitions获取表的所有分区

show partitions TABLENAME

输出如下

pt=2012.07.28.08/is_complete=1
pt=2012.07.28.09/is_complete=1

我们可以使用下面的查询从特定的分区获取数据

select * from TABLENAME where pt='2012.07.28.10' and is_complete='1' limit 1;

或者可以对其应用附加筛选器或分组方式。

gmxoilav

gmxoilav3#

执行@pasha701建议的方法需要加载整个sparkDataframe和所有批处理数据分区,然后找到其中的max。我认为作者正在寻求一种直接找到最大分区日期并只加载它的方法。一种方法是使用hdfs或s3fs,将s3路径的内容作为列表加载,然后找到最大分区,然后仅加载该分区。那样效率会更高。
假设您使用的是aws s3格式,如下所示:

import sys
import s3fs

datelist=[]
inpath="s3:bucket_path/data/"
fs = s3fs.S3FileSystem(anon=False)
Dirs = fs.ls(inpath)
for paths in Dirs:
    date=paths.split('=')[1]
    datelist.append(date)
maxpart=max(datelist)

df=spark.read.parquet("s3://bucket_path/data/batch_date=" + maxpart)

这将完成列表中的所有工作,而不将任何内容加载到内存中,直到找到要加载的内容为止。

相关问题