Pyspark -在Azure synapse工作区列表中获取容器文件夹的所有内容并存储这些数据

aiqt4smr  于 2023-04-29  发布在  Spark
关注(0)|答案(1)|浏览(115)

在Synapse Workspace中,我使用此函数获取配置容器中包含的所有文件:

mssparkutils.fs.ls("abfss://config@datalake.dfs.core.windows.net/")

我得到了这个清单:

[FileInfo(path=abfss://config@datalake.dfs.core.windows.net/config.json, name=config.json, size=26771),
 FileInfo(path=abfss://config@datalake.dfs.core.windows.net/d365crm/Account.xml, name=Account.xml, size=3041),
 FileInfo(path=abfss://config@datalake.dfs.core.windows.net/d365crm/Contact.xml, name=Contact.xml, size=1985),
 FileInfo(path=abfss://config@datalake.dfs.core.windows.net/d365crm/Contract.xml, name=Contract.xml, size=1987)]

我想将此数据存储在pyspark dataframe中

我尝试了以下代码,但返回的都是null值:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType

list_df = [item for sublist in list_for_dataframe for item in sublist]

schema = StructType([
    StructField("path", StringType(), True),
    StructField("name", StringType(), True),
    StructField("size", LongType(), True)
])

df = spark.createDataFrame(list_df, schema=schema)

# Show the DataFrame
df.show()

而我所期望的是:

有人能帮我实现这一点吗?
谢谢大家!

h6my8fg2

h6my8fg21#

如果你能够得到递归文件列表,如上面的列表,将其转换为字典列表。从列表中获取数据框。
对于示例,我有wasbs,这是我的mssparkutils.fs.ls()列表。

将其转换为如下所示的字典列表,您可以看到dataframe已创建。

from pyspark.sql.types import StructType,StructField, StringType, IntegerType,LongType

data=[]
for i in files_list:
    d={}
    d["path"]=i.path
    d["name"]=i.name
    d["size"]=i.size
    data.append(d)
print("List of dictionaries : ",data)

schema = StructType([
    StructField("path", StringType(), True),
    StructField("name", StringType(), True),
    StructField("size", LongType(), True)
])

df = spark.createDataFrame(data, schema=schema)
print("Dataframe is: ")
display(df)

你可以通过@Raki Rahmanblog来学习递归文件列表和使用pandas创建数据框。

相关问题