读取和增加(复制样本并更改某些值).csv中的大型数据集的最有效方式是什么

km0tfn4u  于 2022-09-21  发布在  其他
关注(0)|答案(4)|浏览(122)

目前,我已经设法解决了这个问题,但速度比我需要的要慢。耗时约:500K样本需要1小时,整个数据集是~100M样本,100M样本需要~200小时。

硬件/软件规格:内存8 GB,Windows 11 64位,Python3.8.8

问题是:
我有一个.csv(~13 GB)格式的数据集,其中每个样本都有一个值,每个样本的起止期分别为几个月。我想创建一个数据集,其中每个样本都有相同的值,但引用每个特定的月份。

例如:

出发地:

IDX|开始日期|结束日期|月|年|值
0|20/05/2022|20/07/2022|0|0|X

致:

0|20/05/2022|20/07/2022|5|2022|X
1/20/05/2022|20/07/2022|6|2022|X
2|20/05/2022|20/07/2022|7|2022|X

想法:设法做到并行(就像Dask型的,但我不确定如何完成这项任务)。

我的实施:
在Pandas中阅读组块,在词典中扩充,附加到CSV。使用一个函数,该函数在给定DF的情况下,为每个样本计算从开始日期到结束日期的月份,并为每个月创建一个副本样本,并将其附加到字典中。然后它返回最终的词典。

这些计算都是在词典中进行的,因为它们被发现比在Pandas身上计算要快得多。然后,我以块的形式迭代原始的CSV,并在每个块上应用该函数,将得到的增强的DF附加到另一个CSV。

功能:

def augment_to_monthly_dict(chunk):
    '''
    Function takes a df or subdf  data and creates and returns an Augmented dataset with monthly data in 
    Dictionary form (for efficiency)
    '''
    dict={}
    l=1
    for i in range(len(chunk)):#iterate through every sample
        # print(str(chunk.iloc[i].APO)[4:6] )  
        #Find the months and years period
        mst =int(float((str(chunk.iloc[i].start)[4:6])))#start month
        mend=int(str(chunk.iloc[i].end)[4:6]) #end month
        yst =int(str(chunk.iloc[i].start)[:4] )#start year
        yend=int(str(chunk.iloc[i].end)[:4] )#end year

        if yend==yst:
            months=[ m for m in range(mst,mend+1)]   
            years=[yend for i in range(len(months))]         
        elif yend==yst+1:# year change at same sample
            months=[m for m in range(mst,13)]
            years=[yst for i in range(mst,13)]
            months= months+[m for m in range(1, mend+1)]
            years= years+[yend for i in range(1, mend+1)]
        else:
            continue
        #months is a list of each month in the period of the sample and years is a same 
        #length list of the respective years eg months=[11,12,1,2] , years= 
        #[2021,2022,2022,2022]

        for j in range(len(months)):#iterate through list of months
            #copy the original sample make it a dictionary
            tmp=pd.DataFrame(chunk.iloc[i]).transpose().to_dict(orient='records')

            #change the month and year values accordingly (they were 0 for initiation)

            tmp[0]['month'] = months[j]
            tmp[0]['year'] = years[j]
            # Here could add more calcs e.g. drop irrelevant columns, change datatypes etc 
            #to reduce size
            #
            #-------------------------------------
            #Append new row to the Augmented data
            dict[l] = tmp[0]
            l+=1
    return dict

读取原始数据集(.csv~13 GB),使用函数进行扩充,并将结果附加到新的.csv:

chunk_count=0
for chunk in pd.read_csv('enc_star_logar_ek.csv', delimiter=';', chunksize=10000):

  chunk.index = chunk.reset_index().index

  aug_dict = augment_to_monthly_dict(chunk)#make chunk dictionary to work faster
  chunk_count+=1  

  if chunk_count ==1: #get the column names and open csv write headers and 1st chunk

       #Find the dicts keys, the column names only from the first dict(not reading all data)
       for kk in aug_dict.values():
            key_names = [i for i in kk.keys()] 
            print(key_names)
            break #break after first input dict

       #Open csv file and write ';' separated data
       with open('dic_to_csv2.csv', 'w', newline='') as csvfile:
            writer = csv.DictWriter(csvfile,delimiter=';', fieldnames=key_names)
            writer.writeheader()
            writer.writerows(aug_dict.values())

  else: # Save the rest of the data chunks
       print('added chunk: ', chunk_count)
       with open('dic_to_csv2.csv', 'a', newline='') as csvfile:
            writer = csv.DictWriter(csvfile,delimiter=';', fieldnames=key_names)
            writer.writerows(aug_dict.values())
gmxoilav

gmxoilav1#

当您需要操作数据时,Pandas的效率会发挥作用,为此,Pandas逐行读取输入,为每一列构建一系列数据;这是大量额外的计算,您的问题不会从中受益,实际上只会减慢解决方案的速度。

您实际上需要操作ROWS,为此,最快的方法是使用标准的CSV模块;您所需要做的就是读入一行,写出派生的行,然后重复:

import csv
import sys

from datetime import datetime

def parse_dt(s):
    return datetime.strptime(s, r"%d/%m/%Y")

def get_dt_range(beg_dt, end_dt):
    """
    Returns a range of (month, year) tuples, from beg_dt up-to-and-including end_dt.
    """
    if end_dt < beg_dt:
        raise ValueError(f"end {end_dt} is before beg {beg_dt}")

    mo, yr = beg_dt.month, beg_dt.year

    dt_range = []
    while True:
        dt_range.append((mo, yr))
        if mo == 12:
            mo = 1
            yr = yr + 1
        else:
            mo += 1
        if (yr, mo) > (end_dt.year, end_dt.month):
            break

    return dt_range

fname = sys.argv[1]
with open(fname, newline="") as f_in, open("output_csv.csv", "w", newline="") as f_out:
    reader = csv.reader(f_in)
    writer = csv.writer(f_out)
    writer.writerow(next(reader))  # transfer header

    for row in reader:
        beg_dt = parse_dt(row[1])
        end_dt = parse_dt(row[2])
        for mo, yr in get_dt_range(beg_dt, end_dt):
            row[3] = mo
            row[4] = yr
            writer.writerow(row)

而且,为了与一般的Pandas进行比较,让我们来看看@abkey的特定Pandas解决方案--我不确定是否有更好的Pandas实现,但这个解决方案做得有点正确:

import sys
import pandas as pd

fname = sys.argv[1]
df = pd.read_csv(fname)

df["start date"] = pd.to_datetime(df["start date"], format="%d/%m/%Y")
df["end date"] = pd.to_datetime(df["end date"], format="%d/%m/%Y")

df["month"] = df.apply(
    lambda x: pd.date_range(
        start=x["start date"], end=x["end date"] + pd.DateOffset(months=1), freq="M"
    ).month.tolist(),
    axis=1,
)
df["year"] = df["start date"].dt.year

out = df.explode("month").reset_index(drop=True)

out.to_csv("output_pd.csv")

不过,让我们从基础开始,让程序真正做正确的事情。给定以下输入:

idx,start date,end date,month,year,value
0,20/05/2022,20/05/2022,0,0,X
0,20/05/2022,20/07/2022,0,0,X
0,20/12/2022,20/01/2023,0,0,X

我的程序./main.py input.csv产生:

idx,start date,end date,month,year,value
0,20/05/2022,20/05/2022,5,2022,X
0,20/05/2022,20/07/2022,5,2022,X
0,20/05/2022,20/07/2022,6,2022,X
0,20/05/2022,20/07/2022,7,2022,X
0,20/12/2022,20/01/2023,12,2022,X
0,20/12/2022,20/01/2023,1,2023,X

我相信这就是你要找的。

Pandas解决方案./main_pd.py input.csv产生:

,idx,start date,end date,month,year,value
0,0,2022-05-20,2022-05-20,5,2022,X
1,0,2022-05-20,2022-07-20,5,2022,X
2,0,2022-05-20,2022-07-20,6,2022,X
3,0,2022-05-20,2022-07-20,7,2022,X
4,0,2022-12-20,2023-01-20,12,2022,X
5,0,2022-12-20,2023-01-20,1,2022,X

忽略为框架索引添加的列,以及日期格式已更改的事实(我非常确定可以使用某个我不知道的Pandas指令来解决这一问题),它仍然正确地使用适当的日期范围创建新行。

因此,双方都做了正确的事情。现在,让我们来表演吧。我复制了1_000_000和10_000_000行的初始样本,只有1行:

import sys

nrows = int(sys.argv[1])
with open(f"input_{nrows}.csv", "w") as f:
    f.write("idx,start date,end date,month,year,valuen")
    for _ in range(nrows):
        f.write("0,20/05/2022,20/07/2022,0,0,Xn")

我运行的是一台2020年的M1 MacBook Air,配备2TB固态硬盘(读/写速度非常好):

|1M行(秒,RAM)|10M行(秒,RAM)
-|-|
CSV模块|7.8s,6MB|78s,6MB
Pandas|75s,569MB|750s,5.8 GB

您可以看到,这两个程序的运行时间都随着行大小的增加而线性增加。CSV模块的内存仍然完全不存在,因为它是流数据输入和输出(几乎什么都没有);Pandas的内存随着它必须容纳的行的大小而增加,以便它可以执行实际的日期范围计算,同样是在整个列上。此外,没有显示,但对于10M行的Pandas测试,Pandas仅编写CSV就花了近2分钟-比CSV模块方法完成整个任务所用的时间还要长。

现在,对于我对Pandas的所有贬低,解决方案是少得多的代码,而且很可能从一开始就没有错误。我在编写get_dt_range()时确实遇到了问题,我不得不花大约5分钟时间思考它到底需要做什么并进行调试。

您可以使用小的测试工具和结果here查看我的设置。

rmbxnbpk

rmbxnbpk2#

我建议您使用PANDA(甚至DASK)返回一个巨大数据集(例如,.csv~13 GB)中两列之间的月份列表。首先,您需要使用pandas.to_datetime将两列转换为日期时间。然后,您可以使用pandas.date_range来获取您的列表。

试试这个:

import pandas as pd
from io import StringIO

s = """start date   end date    month   year    value
20/05/2022  20/07/2022  0   0   X
"""

df = pd.read_csv(StringIO(s), sep='t')

df['start date'] = pd.to_datetime(df['start date'], format = "%d/%m/%Y")
df['end date'] = pd.to_datetime(df['end date'], format = "%d/%m/%Y")

df["month"] = df.apply(lambda x: pd.date_range(start=x["start date"], end=x["end date"] + pd.DateOffset(months=1), freq="M").month.tolist(), axis=1)
df['year'] = df['start date'].dt.year

out = df.explode('month').reset_index(drop=True)

>>> print(out)

start date   end date month  year value
0 2022-05-20 2022-07-20     5  2022     X
1 2022-05-20 2022-07-20     6  2022     X
2 2022-05-20 2022-07-20     7  2022     X

注意我在100万个.csv数据集上测试了上面的代码,花了大约10分钟才得到输出。

dxxyhpgq

dxxyhpgq3#

您可以使用dask读取超大CSV文件,然后进行处理(与Pandas相同的API),然后根据需要将其转换为Pandas Dataframe 。当Pandas因数据大小或计算速度而失败时,任务是完美的选择。但对于适合RAM的数据,Pandas通常比DaskDataFrame更快、更容易使用。

import dask.dataframe as dd

# 1. read the large csv

dff = dd.read_csv('path_to_big_csv_file.csv') #return Dask.DataFrame

# if still not enough, try more reducing IO costs:

dff = dd.read_csv('largefile.csv', blocksize=25e6) #use blocksize (number of bytes by which to cut up larger files)
dff = dd.read_csv('largefile.csv', columns=["a", "b", "c"]) #return only columns a, b and c

# 2. work with dff, dask has the same api than pandas:

# https://docs.dask.org/en/stable/dataframe-api.html

# 3. then, finally, convert dff to pandas dataframe if you want

df = dff.compute() #return pandas dataframe

您还可以尝试其他替代方案来高效地读取非常大的CSV文件,同时具有高速度和低内存使用率:polamodinkoalas所有这些套餐,和DASK一样,都使用和Pandas类似的API

如果您有非常大CSV文件,pandas read_csv with chunksize通常不会成功,即使成功,也会耗费时间和精力

rqenqsqc

rqenqsqc4#

convtools库中有一个Table助手(我必须承认,这是我的一个库)。这个帮助器将CSV文件作为流处理,在幕后使用简单的csv.reader

from datetime import datetime

from convtools import conversion as c
from convtools.contrib.tables import Table

def dt_range_to_months(dt_start, dt_end):
    return tuple(
        (year_month // 12, year_month % 12 + 1)
        for year_month in range(
            dt_start.year * 12 + dt_start.month - 1,
            dt_end.year * 12 + dt_end.month,
        )
    )

(
    Table.from_csv("tmp/in.csv", header=True)
    .update(
        year_month=c.call_func(
            dt_range_to_months,
            c.call_func(datetime.strptime, c.col("start date"), "%d/%m/%Y"),
            c.call_func(datetime.strptime, c.col("end date"), "%d/%m/%Y"),
        )
    )
    .explode("year_month")
    .update(
        year=c.col("year_month").item(0),
        month=c.col("year_month").item(1),
    )
    .drop("year_month")
    .into_csv("tmp/out.csv")
)

在我的M1Mac上的一个文件上,每行被分解成三行,它每秒处理100K行。在相同结构的100M行的情况下,它应该花费~1000s(<17分钟)。当然,这取决于每个月的周期有多深。

相关问题