如何通过过滤标题来处理.csv文件

dsekswqp  于 2023-09-28  发布在  其他
关注(0)|答案(1)|浏览(80)

我在数据集中有大量. csv文件的列表,但为了简单起见,我们假设数据集中有三个文件:

  • ASGK_2022.csv
  • ASGK_2023.csv
  • PRAS_2022.csv

我只需要处理标题中包含"ASGK"的文件。换句话说,我需要使用transforms.api.FileSystem.files方法按文件标题过滤文件。所有文件共享相同的列名。
我使用正则表达式代码过滤文件。这里有两段我一直在使用的代码,但是没有成功。

from pyspark.sql import functions as F
from transforms.api import transform, Input, Output
import pandas as pd
import json
import re

@transform(
    output_df=Output(
        ""),
    input_raw=Input(""),
)
def compute(input_raw, output_df, ctx):

    def process_file(file_status):
        with input_raw.filesystem().open(file_status.path, 'rb') as f:
            with pd.ExcelFile(f.read(), engine='openpyxl') as xlsx_path:

                pdf = pd.read_csv(xlsx_path, dtype=str, header=0)
                pdf.columns = pdf.columns.str.lower()

                for row in pdf.to_dict('records'):
                    yield json.dumps(row, default=str)

    rdd = input_raw.filesystem().files(regex=r'.*ASGK.*\.csv$').rdd.flatMap(process_file)
    spark = ctx.spark_session
    dfs = spark.read.json(rdd)
    output_df.write_dataframe(dfs)

我得到的错误:
回溯(最近的呼叫最后一次):文件"/myproject/datasets/www.example.com ",第27行,在compute dfs = spark中。阅读。json(rdd)File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/www.example.com",line 241,in json return self._df(self._jreader。json(jrdd))文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/java_gateway. py”,第1322行,在调用return_value = get_return_value(文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda3 8/lib/python3.8/site-packages/pyspark/sql/www.example.com ExcelFile.py调用o163.json时出错。readwriter.py作业由于阶段故障而中止:utils.py在阶段0.0中丢失任务0.0(TID 0)(本地主机执行器驱动程序):org.apache.spark.api.python.PythonException:protocol.py文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/www.example.com ",第619行,在主进程()文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark. zip/pyspark/worker。py”,行611,进程中串行化器。dump_stream(out_iter,outfile)文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/www.example.com",第132行,in dump_stream for obj in iterator:文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/www.example.com ",第232行,用func for x in iterator:文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/src/myproject/datasets/www.example.com ",第17行,在process_file中,pd. ExcelFile(www.example.com(),engine ='openpyxl ')作为xlsx_path:文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/@thon3.8/site-packages/pandas/io/excel/_base. py”,第1695行,在initself中。_reader = self。_engines [engine](self._io,storage_options = storage_options)文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_openpyxl。py ",第557行,在initsuper()中。init(filepath_or_buffer,storage_options = storage_options)文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_base. py",第545行,在init中www.example.com = self. load_workbook(self.把手handle)文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_openpyxl. py”,第568行,在load_workbook中,返回load_workbook(文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transformsList@python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/www.example.com ",line 344,in load_workbook reader = ExcelReader(文件名,只读,保持vba,文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/con da/run-env/lib/python3.8/site-packages/openpyxl/reader/excel. py”,第123行,在initself中。archive =_validate_archive(fn)文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/www.example.com ",line 95,in_validate_archive archive = ZipFile(filename,'r')File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/www.example.com",line 1269,ininitself._RealGetContents()文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run.html-env/lib/python3.8/zipfile. py ",line 1336,in_RealGetContents raise BadZipFile(" File is not a zip file ")zipfile. BadZipFile:文件不是zip文件
另一种方法是使用OleFile:

def compute(input_raw, output_df, ctx):

    def process_file(file_status):
        with input_raw.filesystem().open(file_status.path, 'rb') as f:
            ole = olefile.OleFileIO(f.read())
            if ole.exists('Workbook'):
                d = ole.openstream('Workbook')
                pdf = pd.read_excel(d, dtype=str, header=0, engine='openpyxl')

         

            for row in pdf.to_dict('records'):
                yield json.dumps(row)

    files_df = input_raw.filesystem().files(regex=r'.*ASGK.*\.csv$')
    rdd = files_df.rdd.flatMap(process_file)
    spark = ctx.spark_session
    dfs = spark.read.json(rdd)

    output_df.write_dataframe(dfs)

回溯(最近的呼叫最后一次):文件"/myproject/datasets/www.example.com ",第33行,在compute dfs = spark中。阅读。json(rdd)File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/www.example.com",line 241,in json return self._df(self._jreader。json(jrdd))文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/java_gateway. py”,第1322行,在调用return_value = get_return_value(文件"/scratch/asset-install/ea90edc26b48342e7e38b6e8eac36f7/miniconda3 8/lib/python3.8/site-packages/pyspark/sql/www.example.com OleFile.py调用o320.json时出错。readwriter.py作业由于阶段故障而中止:utils.py在阶段1.0(TID % 1)中丢失任务0.0(本地主机执行器驱动程序):org.apache.spark.api.python.PythonException:protocol.py文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/www.example.com ",第619行,在主进程()文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark. zip/pyspark/worker。py”,行611,进程中串行化器。dump_stream(out_iter,outfile)文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/www.example.com",第132行,in dump_stream for obj in iterator:文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/www.example.com ",第232行,用func for x in iterator:文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/src/myproject/datasets/www.example.com ",line 18,in process_file ole = olefile. OleFileIO(www.example.com())文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/olefile/www.example.com ",line 1075,ininitwww.example.com(filename,write_mode = write_mode)File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/olefile/www.example.com",line 1169,in open self. fp = open(filename,mode)FileNotFoundError:[Errno 2]没有这样的文件或目录:b'\xef\xbb\xbfname,surname,country\r\nVil,Gru,Fr\r\nAnn,May,De\xc5\xbe\r\n
有什么需要帮忙的吗?

0x6upsns

0x6upsns1#

第一个错误看起来像是库试图将文件解释为zip文件。
File“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/zipfile.py“,line 1336,in _RealGetContents raise BadZipFile(“File is not a zip file”)zipfile.BadZipFile:文件不是zip文件
第二个,它看起来像是试图使用你的csv文件中的一行作为路径?
文件未找到错误:[Errno 2]没有这样的文件或目录:b'\xef\xbb\xbfname,surname,country\r\nVil,Gru,Fr\r\nAnn,May,De\xc5\xbe\r\n
你的正则表达式看起来是正确的,据我所知。
您可以直接使用Spark读取CSV,如以下答案所述:https://stackoverflow.com/a/72312808/5233494

filesystem = raw.filesystem()
    hadoop_path = filesystem.hadoop_path
    files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls(regex=r'.*ASGK.*\.csv$')]
    df = (
        ctx
        .spark_session
        .read
        .option("encoding", "UTF-8")  # UTF-8 is the default
        .option("header", True)
        .option("inferSchema", True)
        .csv(files)
    )

相关问题