csv 数据库中的 parquet 文件

qltillow  于 2023-04-27  发布在  其他
关注(0)|答案(2)|浏览(128)

我试图在Databricks中创建Parquet文件。当我检查文件路径时,目录和文件存在。但是我一直收到第一个csv文件的文件路径不存在的消息(即使它在那里!)。不知道该怎么办。

import os
from pyspark.sql.types import StructType, StructField, StringType

# Define the schema for the files you want to convert
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType

schema = StructType([
  StructField("METER_ADDRESS", StringType(), True),
  StructField("READING_DATE", TimestampType(), True),
  StructField("READING_VALUE_L", DoubleType(), True),
  StructField("LOW_BATTERY_ALR", IntegerType(), True),
  StructField("LEAK_ALR", IntegerType(), True),
  StructField("MAGNETIC_TAMPER_ALR", IntegerType(), True),
  StructField("METER_ERROR_ALR", IntegerType(), True),
  StructField("BACK_FLOW_ALR", IntegerType(), True),
  StructField("BROKEN_PIPE_ALR", IntegerType(), True),
  StructField("EMPTY_PIPE_ALR", IntegerType(), True),
  StructField("SPECIFIC_ERROR_ALR", IntegerType(), True)
])

# Set the input and output directories
input_directory = "/dbfs/FileStore/tables/Calybre Capstone Project - Part 1"
output_directory = "dbfs:/FileStore/tables/Calybre Capstone Project - Part 1/Parquet Files"

# Iterate over each file in the input directory
for filename in os.listdir(input_directory):
    if filename.endswith(".csv"):
        filepath = os.path.join(input_directory, filename)

        # Read in the file using spark.read()
        df = spark.read.csv(filepath, header=True, schema=schema)

        # Write the resulting DataFrame as a parquet file
        output_path = os.path.join(output_directory, filename + ".parquet")
        df.write.parquet(output_path)
v2g6jxz6

v2g6jxz61#

在Databricks中,路径应该以dbfs:/而不是/dbfs/开头。因此,请按如下方式更新input_directory变量:

input_directory = "dbfs:/FileStore/tables/Calybre Capstone Project - Part 1"

您的output_directory路径看起来正常。

wmtdaxz3

wmtdaxz32#

这是一种非常低效的处理多个文件的方法-您只需要提供一个文件掩码来同时列出所有文件,以便一次性处理它们(在Databricks上使用Spark API时,您不需要指定dbfs:/dbfs):

input_directory = "/FileStore/tables/Calybre Capstone Project - Part 1"
output_directory = "/FileStore/tables/Calybre Capstone Project - Part 1/Parquet Files"

df = spark.read.csv(f"{input_directory}/*.csv", header=True, schema=schema)
df.write.parquet(output_directory)

相关问题