pyspark 如何在Azure数据库中从特定文件夹读取数据流

tv6aics1  于 2022-12-28  发布在  Spark
关注(0)|答案(1)|浏览(114)

我需要知道在读取流中如何开始从特定文件夹阅读文件。在我的存储帐户数据来自2019年yyyymmdd格式,我需要从2022年选择数据,并在检查点流的帮助下将照顾新的月份数据。

checkpointPath = '/mnt/checkpointasnmod1'

schemapath = '/FileStore/tables/scema-1.txt'

inputPath ='/mnt/ASN-1.0/*/*'

outputPath ='/mnt/rawoutputpartially1'

schemaJson = spark.read.text(schemapath).first()[0]
schemaStruct = StructType.fromJson(json.loads(schemaJson))

df =spark.readStream.format("cloudFiles").option("cloudFiles.useNotifications","false").option("cloudFiles.validateOptions","false").option("cloudFiles.format","text").option("wholetext","true").option("modifiedAfter","2022-12-20T13:00:00").load(inputPath,schema=None).writeStream.trigger(once=True).outputMode("append").queryName(inputPath).foreachBatch(transformasn).option("checkpointLocation",checkpointPath).start()

这是没有给出任何结果,如果我将尝试读取从2018年直到日期的数据大小几乎是25 GB这是给内存错误。可以任何一个帮助,以便从2022/* 读取哪个属性可以帮助readstream。

1aaf6o9v

1aaf6o9v1#

在数据库中过滤文件与特定的文件夹,你可以使用文件夹名称与*采取所有类似的文件夹/2022*/*

    • 示例**
demodf=(spark.readStream.format("cloudFiles")
        .option("cloudFiles.useNotifications","false")
        .option("cloudFiles.validateOptions","false")
        .option("cloudFiles.format","text")
        .option("wholetext","true")
        .option("modifiedAfter","2022-12-20T13:00:00")
        .load("/mnt/df1/2022*/*",schema=None))

/2022*/*它将从2022开始的所有文件夹及其中的所有文件读取数据。

    • 执行日期:**

相关问题