无法使用Dask合并许多CSV文件(而Pandas工作正常)

bis0qfac  于 2023-05-27  发布在  其他
关注(0)|答案(1)|浏览(180)

我的用例很简单:读取大约70 k个小的CSV-gzip文件,并将它们合并到一个具有公共索引的单个压缩parquet文件中。每个文件压缩后重约3 KB,解压后重约9 KB。所有压缩在一起的文件大小约为200 MB。
串行版本看起来像

dfs = [ ]
for f in tqdm(files):
    df = pd.read_csv(f)
    # do something simple to add a new index column
    dfs.append(df)

final = pd.concat(dfs)
# write final to disk as parquet file

这大约需要2分钟,不使用大量内存,并写入一个大小约为200 MB的文件,这正是我所期望的。
我想到使用Dask优化运行时,因此构建了类似的代码

with Client(n_workers=4) as client:
   df = dd.read_csv(files)
   # do the same thing to add a new column
   final = df.compute() # write it to disk afterwards

我不得不在5分钟的执行时间后停止它,因为在使用了大量的内存后,dask开始在交换中将内容溢出到磁盘。而我只完成了整个进度的5%!加上我不断收到这些警告

distributed.utils_perf - WARNING - full garbage collections took 34% CPU time recently

我希望Dask在类似的任务中具有优势...我做错了什么?

5cnsuln7

5cnsuln71#

以下代码的作用的说明

with Client(n_workers=4) as client:
   df = dd.read_csv(files)
   final = df.compute()

您要求共享整个机器RAM的每个工作进程处理数据块,然后在另一个进程(客户端进程)中将它们构造成单个全局 Dataframe 。这将涉及到在进程之间发送片段的大量序列化以及进程中的内存复制。
Dask很少会给予你一个很大的提升,如果有的话,对于纯数据负载,这往往是由你的存储设备的限制。另外,您已经选择了分布式调度器-请阅读您是否真的想要这个!
Dong .compute()以便您可以写出结果,这在很大程度上是一种反模式,也是dask速度慢的主要原因。如果所有数据都适合一个过程,那么就使用pandas。如果您需要拆分为多个部分以实现核外和/或并行**,请在worker中执行写入**。任何并行/分布式系统都是这样的(包括polars的核外-如果您所做的事情是立即将数据转换为内存中的pandas dataframe,则无法期望节省内存)。
这通常意味着类似于

with Client(n_workers=4) as client:
   df = dd.read_csv(files)
   df2 = df.assign(...) # and other processing
   df2.to_parquet(locaton)

最后,您谈到了设置索引,尽管没有显示。在dask中,这是一种特殊的情况,它对数据进行排序,并且可能非常昂贵(https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.set_index.html)。这就是你需要的吗

相关问题