pyspark 如何在数据块中循环Azure Blob存储位置并将文件处理到 Dataframe 中?

41zrol4v  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(95)

我有一个由事件中心创建的文件系统,它每10分钟将文件保存到一个位置,格式如下:{命名空间}/{事件中心}/{分区ID}/{年}/{月}/{日}/{小时}/{分钟}/{秒}.avro
例如:事件thub/事件子服务/0/2022/01/01/01/11/31.avro
有2个分区:0和1,其余为上述日期格式。
我试图找出一种方法来循环每个文件夹结构,把avro文件,把它变成一个df,然后把它放在一个更合理的地方。
然而,我还是想不明白,也没什么进展。我已经做到了:

dbutils.fs.ls('/mnt/mount-name/eventhub/eventhubservice/0/2022/01/01/01/11/31.avro')

df = spark.read.format("com.databricks.spark.avro").load("/mnt/mount-name/eventhub/eventhubservice/0/2022/01/01/01/11/31.avro")
display(df)

以前有人在Azure Databricks中做过类似的事情吗?

5m1hhzi4

5m1hhzi41#

  • 您可以使用glob文件路径,同时将数据读入 Dataframe 。我有以下文件夹结构,其中每个文件夹都有一个csv文件进行演示。

  • 现在,为了将这些文件夹中的所有文件读入一个 Dataframe ,我以下面的方式使用了glob file path代码。
df = spark.read.option("header",True).format("csv").load("/mnt/repro/2022/10/*/*.csv")

# both the files have same data for demonstration

df.show()

  • 因此,只要文件夹中有多个文件夹/文件,就将其替换为*,表示所有内容。使用以下代码:
df = spark.read.format("com.databricks.spark.avro").load("/mnt/mount-name/eventhub/eventhubservice/*/2022/*/*/*/*/*.avro")
display(df)

# for year 2022.

相关问题