pandas 使用polars高效地分块读取csv(使用有限的可用RAM)

rslzwgfq  于 2023-06-04  发布在  其他
关注(0)|答案(2)|浏览(366)

我试图在一台小机器上读取一个大的CSV(大约6.4 Go)(在Windows上的小笔记本电脑,内存为8 Go),然后将其存储到SQLite数据库中(我知道有替代方案,这不是重点)。

  • 如果需要,我使用的文件可以在that page上找到;在“Fichiers”选项卡中,应标记为“Sirene:Fichier StockEtablissementHistorique [.]"。这个文件现在大约有3700万行。*

作为一个Pandas的忠实粉丝,我还是决定尝试一下那些日子里广告很多的极地。
推断出的 Dataframe 也应该被连接到另一个用pl.read_database生成的 Dataframe (它生成一个pl.DataFrame而不生成pl.LazyFrame)。

  • 我的第一次尝试涉及一个LazyFrame,并且(天真地)希望带有low_memory参数的scan_csv足以处理RAM消耗。它完全冻结我的电脑后,过度消耗内存。
  • 我使用n_rows沿着skip_rows_after_header进行了另一次尝试。但是如果pl.read_csv(my_path, n_rows=1_000_000)工作正常,pl.read_csv(my_path, n_rows=1_000_000, skip_rows_after_header=30_000_000)似乎永远都要花时间(比一个简单的循环来查找行数要多得多)。
  • 我也试过pl.read_csv_batched,但似乎也花了很长时间(也许是计算第一个统计数据不是在文档中描述的)。
  • 我发现用polars完全处理文件的唯一方法是处理LazyFrame中的切片并收集它。就像这样:
df = (
    pl.scan_csv(
        url,
        separator=",",
        encoding="utf8",
        infer_schema_length=0,
        low_memory=True,
    )
    .lazy()
    .select(pl.col(my_cols)
    # do some more processing, for instance 
    .filter(pl.col("codePaysEtrangerEtablissement").is_null())
)
chunksize=1_000_000
for k in range(max_iterations:)
    chunk = df.slice(chunksize*k, chunksize).collect()
    chunk = chunk.join(my_other_dataframe, ... )
    # Do some more things like storing the chunk in a database.

这个“解决方案”似乎可以处理内存,但执行速度非常慢。
我发现了另一个解决方案,它似乎工作得很好(我将作为临时答案发布),但使用了pandas read_csv和chunksize。这是一样好,因为是去和工作,只是因为(谢天谢地)没有groupby参与我的过程。
我敢肯定,应该有一个更容易的“纯极性”的方式进行。

编辑

这里提到的另一个 Dataframe (代码示例中的my_other_dataframe)很小。它是一个大约36 k行的 Dataframe ,严格用于将字段“codeCommuneEtablissement”从它的5长字符串转换为存储在另一个表中的整数主键。我在这里的示例中提到了它,以解释为什么你需要更早地收集 Dataframe ,因为你不能加入一个LazyFrame和一个DataFrame。

qyswt5oh

qyswt5oh1#

这是很难回答这个问题,因为我不知道你的其他df看起来像什么,我不知道你到底在做什么,崩溃你的电脑,但这里有一些意见/建议。
arrow,并且,通过扩展,polars并没有针对字符串进行优化,所以你可能做的最糟糕的事情之一就是加载一个巨大的文件,所有的列都作为字符串加载。
来自scan_csv文档

infere_schema_length为推断架构而读取的最大行数。如果设置为0,则所有列将被读取为pl.Utf8。

你把它设置为0,这意味着你正在加载一个巨大的文件作为所有字符串。别这样将其设置为合理的值,或者手动设置dtypes参数。理想情况下,后者。此外,如果有些列是字符串,但具有相对较少的唯一值,则可以将它们设为分类列以保存更多内存。此外,将整数设置为最小的变化也会保存一些内存。Categoricals的一个问题是在这些列上进行连接比较棘手。要绕过这个尝试:

import polars as pl
with pl.StringCache():
    dtypes={
    'siren':pl.UInt32(),
    'nic':pl.UInt32(),
    'siret':pl.UInt64(),
    'dateFin':pl.Date(),
    'dateDebut':pl.Date(),
    'etatAdministratifEtablissement':pl.Categorical(),
    'changementEtatAdministratifEtablissement':pl.Boolean(),
    'enseigne1Etablissement':pl.Categorical(),
    'enseigne2Etablissement':pl.Categorical(),
    'enseigne3Etablissement':pl.Categorical(),
    'changementEnseigneEtablissement':pl.Boolean(),
    'denominationUsuelleEtablissement':pl.Categorical(),
    'changementDenominationUsuelleEtablissement':pl.Boolean(),
    'activitePrincipaleEtablissement':pl.Categorical(),
    'nomenclatureActivitePrincipaleEtablissement':pl.Categorical(),
    'changementActivitePrincipaleEtablissement':pl.Boolean(),
    'caractereEmployeurEtablissement':pl.Categorical(),
    'changementCaractereEmployeurEtablissement':pl.Boolean()
    }

    df=pl.scan_csv("StockEtablissementHistorique_utf8.csv", 
                dtypes=dtypes)
    other_df=pl.read_database(your_query, your_conn_str).with_columns(pl.col(string_cols_to_join_on).cast(pl.Categorical())).lazy()

顺便说一下,一些快速的统计数据...
从这里,如果我做df.collect().estimated_size('gb'),它会返回为4.57。
相比之下,如果我使用pl.read_csv(thefile).estimated_size('gb'),则它是10.5,因此在更改列类型方面有很大的节省。
你可能需要把所有东西都保存在StringCache上下文中,直到你收集为止,我不确定StringCache是如何与LazyFrames交互的。由于运行scan_csv基本上不需要时间,因此它显然没有立即建立分类索引。
您可能还应该将csv重新保存为多个parquet,然后通过pyarrow将其作为数据集延迟加载。

ljo96ir5

ljo96ir52#

这个解决方案使用pandas的方式来分块CSV。我发现Pandas的csv阅读器比Polars更成熟;它通过它的TextFileReader对象很容易地处理内存消耗。
请注意,只要没有涉及groupby,这就可以工作。

reader = (
      pd.read_csv(url, sep=",", encoding="utf8", iterator=True, usecols=my_cols, chunksize=1_000_000
      )
 for chunk in reader:
     df = (
         pl.DataFrame(chunk)
         .lazy()
         # do some more processing, for instance 
         .filter(pl.col("codePaysEtrangerEtablissement").is_null())
         .collect()
     )
     df = df.join(my_other_dataframe, ... )
     # Do some more things like storing the chunk in a database.

相关问题