我试图在Databricks中创建Parquet文件。当我检查文件路径时,目录和文件存在。但是我一直收到第一个csv文件的文件路径不存在的消息(即使它在那里!)。不知道该怎么办。
import os
from pyspark.sql.types import StructType, StructField, StringType
# Define the schema for the files you want to convert
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType
schema = StructType([
StructField("METER_ADDRESS", StringType(), True),
StructField("READING_DATE", TimestampType(), True),
StructField("READING_VALUE_L", DoubleType(), True),
StructField("LOW_BATTERY_ALR", IntegerType(), True),
StructField("LEAK_ALR", IntegerType(), True),
StructField("MAGNETIC_TAMPER_ALR", IntegerType(), True),
StructField("METER_ERROR_ALR", IntegerType(), True),
StructField("BACK_FLOW_ALR", IntegerType(), True),
StructField("BROKEN_PIPE_ALR", IntegerType(), True),
StructField("EMPTY_PIPE_ALR", IntegerType(), True),
StructField("SPECIFIC_ERROR_ALR", IntegerType(), True)
])
# Set the input and output directories
input_directory = "/dbfs/FileStore/tables/Calybre Capstone Project - Part 1"
output_directory = "dbfs:/FileStore/tables/Calybre Capstone Project - Part 1/Parquet Files"
# Iterate over each file in the input directory
for filename in os.listdir(input_directory):
if filename.endswith(".csv"):
filepath = os.path.join(input_directory, filename)
# Read in the file using spark.read()
df = spark.read.csv(filepath, header=True, schema=schema)
# Write the resulting DataFrame as a parquet file
output_path = os.path.join(output_directory, filename + ".parquet")
df.write.parquet(output_path)
2条答案
按热度按时间v2g6jxz61#
在Databricks中,路径应该以dbfs:/而不是/dbfs/开头。因此,请按如下方式更新input_directory变量:
您的output_directory路径看起来正常。
wmtdaxz32#
这是一种非常低效的处理多个文件的方法-您只需要提供一个文件掩码来同时列出所有文件,以便一次性处理它们(在Databricks上使用Spark API时,您不需要指定
dbfs:
或/dbfs
):