我在数据集中有大量. csv文件的列表,但为了简单起见,我们假设数据集中有三个文件:
- ASGK_2022.csv
- ASGK_2023.csv
- PRAS_2022.csv
我只需要处理标题中包含"ASGK"的文件。换句话说,我需要使用transforms.api.FileSystem.files方法按文件标题过滤文件。所有文件共享相同的列名。
我使用正则表达式代码过滤文件。这里有两段我一直在使用的代码,但是没有成功。
from pyspark.sql import functions as F
from transforms.api import transform, Input, Output
import pandas as pd
import json
import re
@transform(
output_df=Output(
""),
input_raw=Input(""),
)
def compute(input_raw, output_df, ctx):
def process_file(file_status):
with input_raw.filesystem().open(file_status.path, 'rb') as f:
with pd.ExcelFile(f.read(), engine='openpyxl') as xlsx_path:
pdf = pd.read_csv(xlsx_path, dtype=str, header=0)
pdf.columns = pdf.columns.str.lower()
for row in pdf.to_dict('records'):
yield json.dumps(row, default=str)
rdd = input_raw.filesystem().files(regex=r'.*ASGK.*\.csv$').rdd.flatMap(process_file)
spark = ctx.spark_session
dfs = spark.read.json(rdd)
output_df.write_dataframe(dfs)
我得到的错误:
回溯(最近的呼叫最后一次):文件"/myproject/datasets/www.example.com ",第27行,在compute dfs = spark中。阅读。json(rdd)File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/www.example.com",line 241,in json return self._df(self._jreader。json(jrdd))文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/java_gateway. py”,第1322行,在调用return_value = get_return_value(文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda3 8/lib/python3.8/site-packages/pyspark/sql/www.example.com ExcelFile.py调用o163.json时出错。readwriter.py作业由于阶段故障而中止:utils.py在阶段0.0中丢失任务0.0(TID 0)(本地主机执行器驱动程序):org.apache.spark.api.python.PythonException:protocol.py文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/www.example.com ",第619行,在主进程()文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark. zip/pyspark/worker。py”,行611,进程中串行化器。dump_stream(out_iter,outfile)文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/www.example.com",第132行,in dump_stream for obj in iterator:文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/www.example.com ",第232行,用func for x in iterator:文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/src/myproject/datasets/www.example.com ",第17行,在process_file中,pd. ExcelFile(www.example.com(),engine ='openpyxl ')作为xlsx_path:文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/@thon3.8/site-packages/pandas/io/excel/_base. py”,第1695行,在initself中。_reader = self。_engines [engine](self._io,storage_options = storage_options)文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_openpyxl。py ",第557行,在initsuper()中。init(filepath_or_buffer,storage_options = storage_options)文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_base. py",第545行,在init中www.example.com = self. load_workbook(self.把手handle)文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_openpyxl. py”,第568行,在load_workbook中,返回load_workbook(文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transformsList@python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/www.example.com ",line 344,in load_workbook reader = ExcelReader(文件名,只读,保持vba,文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/con da/run-env/lib/python3.8/site-packages/openpyxl/reader/excel. py”,第123行,在initself中。archive =_validate_archive(fn)文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/www.example.com ",line 95,in_validate_archive archive = ZipFile(filename,'r')File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/www.example.com",line 1269,ininitself._RealGetContents()文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run.html-env/lib/python3.8/zipfile. py ",line 1336,in_RealGetContents raise BadZipFile(" File is not a zip file ")zipfile. BadZipFile:文件不是zip文件
另一种方法是使用OleFile:
def compute(input_raw, output_df, ctx):
def process_file(file_status):
with input_raw.filesystem().open(file_status.path, 'rb') as f:
ole = olefile.OleFileIO(f.read())
if ole.exists('Workbook'):
d = ole.openstream('Workbook')
pdf = pd.read_excel(d, dtype=str, header=0, engine='openpyxl')
for row in pdf.to_dict('records'):
yield json.dumps(row)
files_df = input_raw.filesystem().files(regex=r'.*ASGK.*\.csv$')
rdd = files_df.rdd.flatMap(process_file)
spark = ctx.spark_session
dfs = spark.read.json(rdd)
output_df.write_dataframe(dfs)
回溯(最近的呼叫最后一次):文件"/myproject/datasets/www.example.com ",第33行,在compute dfs = spark中。阅读。json(rdd)File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/www.example.com",line 241,in json return self._df(self._jreader。json(jrdd))文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/java_gateway. py”,第1322行,在调用return_value = get_return_value(文件"/scratch/asset-install/ea90edc26b48342e7e38b6e8eac36f7/miniconda3 8/lib/python3.8/site-packages/pyspark/sql/www.example.com OleFile.py调用o320.json时出错。readwriter.py作业由于阶段故障而中止:utils.py在阶段1.0(TID % 1)中丢失任务0.0(本地主机执行器驱动程序):org.apache.spark.api.python.PythonException:protocol.py文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/www.example.com ",第619行,在主进程()文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark. zip/pyspark/worker。py”,行611,进程中串行化器。dump_stream(out_iter,outfile)文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/www.example.com",第132行,in dump_stream for obj in iterator:文件"/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/www.example.com ",第232行,用func for x in iterator:文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/src/myproject/datasets/www.example.com ",line 18,in process_file ole = olefile. OleFileIO(www.example.com())文件"/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/olefile/www.example.com ",line 1075,ininitwww.example.com(filename,write_mode = write_mode)File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/olefile/www.example.com",line 1169,in open self. fp = open(filename,mode)FileNotFoundError:[Errno 2]没有这样的文件或目录:b'\xef\xbb\xbfname,surname,country\r\nVil,Gru,Fr\r\nAnn,May,De\xc5\xbe\r\n
有什么需要帮忙的吗?
1条答案
按热度按时间0x6upsns1#
第一个错误看起来像是库试图将文件解释为zip文件。
File“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/zipfile.py“,line 1336,in _RealGetContents raise BadZipFile(“File is not a zip file”)zipfile.BadZipFile:文件不是zip文件
第二个,它看起来像是试图使用你的csv文件中的一行作为路径?
文件未找到错误:[Errno 2]没有这样的文件或目录:b'\xef\xbb\xbfname,surname,country\r\nVil,Gru,Fr\r\nAnn,May,De\xc5\xbe\r\n
你的正则表达式看起来是正确的,据我所知。
您可以直接使用Spark读取CSV,如以下答案所述:https://stackoverflow.com/a/72312808/5233494