Pyspark:如何将表格写入AWS S3文件

ui7jx7zq  于 2023-03-03  发布在  Apache
关注(0)|答案(1)|浏览(168)

我尝试向S3写入一个简单的文件:

from pyspark.sql import SparkSession
from pyspark import SparkConf
import os
from dotenv import load_dotenv
from pyspark.sql.functions import *

# Load environment variables from the .env file
load_dotenv()

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")

# My spark configuration
conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.2')
conf.set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY_ID)
conf.set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_ACCESS_KEY)

spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Create a PySpark DataFrame
df = spark.createDataFrame([(1, "John Doe", 30), (2, "Jane Doe", 35), (3, "Jim Brown", 40)], ["id", "name", "age"])

# Write the DataFrame to a CSV file on S3
df.write.format("csv").option("header","true").mode("overwrite").save("s3a://bucket/test/store/price.csv")

# Stop the Spark context and Spark session
spark.stop()

但这并不能像文件那样保存price.csv,它会打开一个新的空文件夹:

如果我想本地保存,结果相同。这只会打开一个名为price.csv的文件夹

jchrr9hc

jchrr9hc1#

这是因为spark总是写出一堆文件,记住分区、成功等等。
我已经编写了一个函数,用在数据库中,将具有单个分区的文件夹提升为文件。

#
# Define function to find matching files
# 

# import libraries
import fnmatch

# define function
def get_file_list(path_txt, pattern_txt):
  
  # list of file info objects
  fs_lst = dbutils.fs.ls(path_txt)
  
  # create list of file names
  dir_lst = list()
  for f in fs_lst:
      dir_lst.append(f[1])
      
  # filter file names by pattern
  files_lst = fnmatch.filter(dir_lst, pattern_txt)
  
  # return list
  return(files_lst)

此函数用于查找目录中的文件。

# 
#  Keep only the single delimited file
#

# Define function
def unwanted_file_cleanup(folder_name, file_name, file_ext):
  try:
    
    # define tmp dir
    tmp_dir = folder_name
    
    # find new file
    tmp_lst = get_file_list(tmp_dir, "part*." + file_ext)
    tmpfile_txt = tmp_dir + "/" + tmp_lst[0]

    # remove old file
    dbutils.fs.rm(file_name, recurse=False)
    
    # copy new file
    dbutils.fs.cp(tmpfile_txt, file_name)
    
    # remove tmp dir, clean up step
    dbutils.fs.rm(tmp_dir, recurse=True)
    
    return True
  
  except Exception as err:
    raise(err)

这个函数将输出目录转换成一个文件。你必须使用一个分区来保存这个文件。
Databricks附带样本温度数据。

# read in low temps
path1 = "/databricks-datasets/weather/low_temps"
df1 = (
  spark.read                    
  .option("sep", ",")        
  .option("header", "true")
  .option("inferSchema", "true")  
  .csv(path1)               
)

# create temp view
df1.createOrReplaceTempView("tmp_low_temps")

读入csv文件并创建低温的温度视图。

# read in low temps
path2 = "/databricks-datasets/weather/high_temps"
df2 = (
  spark.read                    
  .option("sep", ",")        
  .option("header", "true")
  .option("inferSchema", "true")  
  .csv(path2)               
)

# create temp view
df2.createOrReplaceTempView("tmp_high_temps")

读入csv文件并创建高温的温度视图。

# make sql string
sql_stmt = """
  select 
    l.date as obs_date,
    h.temp as obs_high_temp,
    l.temp as obs_low_temp
  from 
    tmp_high_temps as h
  join
    tmp_low_temps as l
  on
    h.date = l.date
"""

# execute
df = spark.sql(sql_stmt)

# Write out csv file
path = "/lake/bronze/weather/temp"
(
  df.repartition(1).write
    .format("parquet")
    .mode("overwrite")
    .save(path)
)

在我的用例中,我只写了一个parquet文件来表示两个数据集的连接,但是spark创建多个文件的想法是一样的,不管云供应商是谁,确保在你的用例中使用repartition(1)只得到一个csv类型的输出文件。

这里是我所说的多个文件。

#
# create single file
#
unwanted_file_cleanup("/lake/bronze/weather/temp/", "/lake/bronze/weather/temperature-data.parquet", "parquet")

上面的函数找到分区文件,移动/重命名到父目录并删除临时目录。

#
# Show directory listing
#
dbutils.fs.ls("/lake/bronze/weather/")

输出:

[FileInfo(path='dbfs:/lake/bronze/weather/temperature-data.parquet', name='temperature-data.parquet', size=9138)

相关问题