pandas 如何使用databricks实用程序(dbutils)将 Dataframe 保存到Azure datalake中的csv文件夹中

y4ekin9u  于 2022-12-09  发布在  其他
关注(0)|答案(1)|浏览(146)

我有一个名为data的数据框,我正在使用www.example.com _csv将其保存为csv文件到我的datalake中pandas.to。但是,将文件保存为csv需要花费很多时间。有人能告诉我如何使用dbutils将csv文件保存到datalake中吗?另外,请确认创建目录的代码(如果不存在)是否正确

d = data.groupby(['Col1', 'Col2'])
for k, Dates in d:
    if not Dates.empty:
        PATH = /dbfs/mnt/data/../'
        try:
            dbutils.fs.ls(PATH)                      
            pass
        except Exception as e:
            if 'java.io.FileNotFoundException' in str(e):
                dbutils.fs.mkdirs(PATH)                              
        Dates.to_csv(PATH+f'{Day}.csv',index=False)
uemypmqf

uemypmqf1#

In dbutils there is only coalesce and partition methods for saving files to csv and they will create files with Random names to create files in required names we use pandas to_csv method

Method 1

  1. From Azure Databricks home, you can go to “Upload Data” (under Common Tasks) → “DBFS” → “FileStore”.
  2. I created a folder “df” and saved a data frame “Sample” into CSV. It is important to use coalesce(1) since it saves the data frame as a whole.
Sample.coalesce(1).write.format("com.databricks.spark.csv").option("header","true").save("dbfs:/FileStore/df/Sample.csv")
  1. The “part-00000” is the CSV file

  2. Download file to local and rename if required

  3. Upload the csv file manually to datalake storage as follows

Method 2

  1. Data in the DBFS Databricks File System
df = spark.read.format("csv").option("recursiveFileLookup", "true").option("inferSchema", "true").option("header", "true").load("dbfs:/myfolder/sample/")
df.show()

  1. Configure storage account access key globally
spark.conf.set("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<<ACCESS KEY>>")
  1. Configure storage account folder
output_container_path = "abfss://<<filesystem>>@<<Storage_Name>>.dfs.core.windows.net/<<DirectoryName>>"
 output_blob_folder = "%s/CSV_data_folder" % output_container_path
  1. write the dataframe as a single csv file to storage
(df
    .coalesce(1)
    .write
    .mode("overwrite")
    .option("header", "true")
    .format("com.databricks.spark.csv")
    .save(output_blob_folder))

  1. Uploaded file in storage

相关问题