在pyspark中读取csv文件,同时强制执行schema,但在最后也忽略额外的列

zdwk9cvp  于 2023-09-28  发布在  Spark
关注(0)|答案(1)|浏览(124)

我正在尝试读取pyspark中的管道分隔文件。我的要求是:
->仅当所有列都按照模式中的给定顺序时才读取文件。如果顺序发生更改,或者缺少特定列,则忽略该文件。我使用选项header='True',enforceSchema=False完成了此操作
->现在的要求是,在这样做的时候,如果在原始文件的末尾有额外的列,那么忽略这些列,但读取文件。当前代码将引发异常,因为在强制模式时,文件和模式中的列数也应该匹配。
这是我用来读取文件的代码。df = spark.read.schema(schema).options(delimiter='|',header='True',enforceSchema=False).csv(file)
我如何达到这一要求?
范例:
假设模式为A、B、C、D
如果文件列为A| B| C| D,读文件并继续。(此部分正在运行)
如果文件列为A| C| B| D,然后忽略文件/引发错误。(此部分正在工作)
如果文件列为A| B| C| D| E,然后也读取文件并继续。不要在这里提出错误。(这部分是我努力实现的)

tgabmvqs

tgabmvqs1#

试试这个:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType

spark = SparkSession.builder.appName("ReadCSV").getOrCreate()

schema_str = "A,B,C,D"
schema = StructType.fromDDL(schema_str)

def file_matches_schema(filepath, schema_str, delimiter):
    header = spark.read.options(delimiter=delimiter, header='True', maxRows=1).csv(filepath).columns
    return ','.join(header).startswith(schema_str)

file = "/path/to/your/file.csv"
delimiter = '|'

if file_matches_schema(file, schema_str, delimiter):
    df = spark.read.schema(schema).options(delimiter=delimiter, header='True').csv(file)
else:
    print("File columns do not match the desired schema order or have missing columns!")

相关问题