我尝试向S3写入一个简单的文件:
from pyspark.sql import SparkSession
from pyspark import SparkConf
import os
from dotenv import load_dotenv
from pyspark.sql.functions import *
# Load environment variables from the .env file
load_dotenv()
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
# My spark configuration
conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.2')
conf.set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY_ID)
conf.set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_ACCESS_KEY)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# Create a PySpark DataFrame
df = spark.createDataFrame([(1, "John Doe", 30), (2, "Jane Doe", 35), (3, "Jim Brown", 40)], ["id", "name", "age"])
# Write the DataFrame to a CSV file on S3
df.write.format("csv").option("header","true").mode("overwrite").save("s3a://bucket/test/store/price.csv")
# Stop the Spark context and Spark session
spark.stop()
但这并不能像文件那样保存price.csv,它会打开一个新的空文件夹:
如果我想本地保存,结果相同。这只会打开一个名为price.csv的文件夹
1条答案
按热度按时间jchrr9hc1#
这是因为spark总是写出一堆文件,记住分区、成功等等。
我已经编写了一个函数,用在数据库中,将具有单个分区的文件夹提升为文件。
此函数用于查找目录中的文件。
这个函数将输出目录转换成一个文件。你必须使用一个分区来保存这个文件。
Databricks附带样本温度数据。
读入csv文件并创建低温的温度视图。
读入csv文件并创建高温的温度视图。
在我的用例中,我只写了一个parquet文件来表示两个数据集的连接,但是spark创建多个文件的想法是一样的,不管云供应商是谁,确保在你的用例中使用repartition(1)只得到一个csv类型的输出文件。
这里是我所说的多个文件。
上面的函数找到分区文件,移动/重命名到父目录并删除临时目录。
输出: