import pandas as pd
import numpy as np
import os
import csv2parquet
from subprocess import run
import fastparquet
import sys
import pyarrow.parquet as pq
path ="C:/.../amex-default-prediction/"
parquet="parquet/"
#create new folder train_data
path_train_data="train_data/"
def get_path_parquet(file):
if file.split('.')[0]=="sample_submission":
return path_sample_submission
elif file.split('.')[0]=="test_data":
return path_test_data
elif file.split('.')[0]=="train_data":
return path_train_data
elif file.split('.')[0]=="train_labels":
return path_train_label
def csv_to_parquet(df,title, path,i):
"""
Convert Csv files to parquet
df : csv data
title : name data
path : folder into the save parquet data
"""
try:
title_prefix=title.split(".")[0]+str(i)
out_title = path + f'\\{title_prefix}.parquet'
df.to_parquet(out_title, engine='fastparquet')
except:
sys.exit(-1)
def loding_csv_with_chunk(path,file):
try:
chunk_csv= pd.read_csv(path + f'\\{file}', low_memory=False, chunksize = 5000)
#df = pd.concat(chunk for chunk in chunk_csv)
return chunk_csv
except:
sys.exit(-1)
def read_partition_parquet():
dataset = pq.ParquetDataset(path_train_, use_legacy_dataset=False)
data=dataset.read().to_pandas()
return data
#csv_df
for file in os.listdir(path):
if file[-4:]==".csv":
print("begin process for : "+str(file)+ "....")
#csv_df = pd.read_csv(path + f'\\{file}')
##load data with chunck method
chunk_csv = loding_csv_with_chunk(path,file)
##for each chunck save the data on parquet format
for i, df_chunk in enumerate(chunk_csv):
print(df_chunk.shape)
title_prefix=file.split(".")[0]+str(i)
out_title = path+parquet+get_path_parquet(file) + f'{title_prefix}.parquet'
df_chunk.to_parquet(out_title, engine='fastparquet')
#csv_to_parquet(csv_df,file, path)
print("end process for : "+str(file)+ "....")
else:
continue
1条答案
按热度按时间yc0p9oo01#
为了解决内存问题,您可以先用Pandas的chunck方法导入数据,并将每个chunck保存为一个 parquet 文件。因此,以您的情况为例,创建一个文件夹“train_data”,并在此文件夹中保存与这些chunck对应的不同 parquet 文件。