我需要知道在读取流中如何开始从特定文件夹阅读文件。在我的存储帐户数据来自2019年yyyymmdd格式,我需要从2022年选择数据,并在检查点流的帮助下将照顾新的月份数据。
checkpointPath = '/mnt/checkpointasnmod1'
schemapath = '/FileStore/tables/scema-1.txt'
inputPath ='/mnt/ASN-1.0/*/*'
outputPath ='/mnt/rawoutputpartially1'
schemaJson = spark.read.text(schemapath).first()[0]
schemaStruct = StructType.fromJson(json.loads(schemaJson))
df =spark.readStream.format("cloudFiles").option("cloudFiles.useNotifications","false").option("cloudFiles.validateOptions","false").option("cloudFiles.format","text").option("wholetext","true").option("modifiedAfter","2022-12-20T13:00:00").load(inputPath,schema=None).writeStream.trigger(once=True).outputMode("append").queryName(inputPath).foreachBatch(transformasn).option("checkpointLocation",checkpointPath).start()
这是没有给出任何结果,如果我将尝试读取从2018年直到日期的数据大小几乎是25 GB这是给内存错误。可以任何一个帮助,以便从2022/* 读取哪个属性可以帮助readstream。
1条答案
按热度按时间1aaf6o9v1#
在数据库中过滤文件与特定的文件夹,你可以使用文件夹名称与
*
采取所有类似的文件夹/2022*/*
/2022*/*
它将从2022开始的所有文件夹及其中的所有文件读取数据。