我有10个非常大的CSV文件(可能具有相同的头,也可能不具有相同的头),我使用“readr”包read_csv_chunked以块的形式连续阅读和处理这些文件().目前,我能够使用10个核心并行读取10个文件。这个过程仍然需要一个小时。我有128个核心。我是否可以将每个CSV拆分为10个块,以便为每个文件并行处理,从而使用100个内核?以下是我目前拥有的(创建两个示例文件仅用于测试):
library(doParallel)
library(foreach)
# Create a list of two sample CSV files and a filter by file
df_1 <- data.frame(matrix(sample(1:300), ncol = 3))
df_2 <- data.frame(matrix(sample(1:200), ncol = 4))
filter_by_df <- data.frame(X1 = 1:100)
write.csv(df_1, "df_1.csv", row.names = FALSE)
write.csv(df_2, "df_2.csv", row.names = FALSE)
files <- c("df_1.csv", "df_2.csv")
# Create a function to read and filter each file in chunks
my_function <-
function(file) {
library(dplyr)
library(plyr)
library(readr)
filter_df <-
function(x, pos) {
subset(x, X1 %in% filter_by_df$X1 | X2 %in% filter_by_df$X1)
}
readr_df <-
read_csv_chunked(file,
callback = DataFrameCallback$new(filter_df),
progress = F,
chunk_size = 50) %>%
as.data.frame() %>%
distinct()
return(readr_df)
}
# Apply the custom function created above to all files in parallel and combine them
df_foreach <-
foreach(i = files, .combine = rbind.fill, .packages = c("plyr")) %dopar% my_function(i)
我正在研究嵌套foreach(),但不确定如何以嵌套方式传递不同的函数(read_csv_chunked with .合并= rbind和我的自定义函数filter_df()with .combine = rbinf.fill())。我还研究了“future”包。任何建议都非常感谢。谢谢。
3条答案
按热度按时间2ic8powd1#
正如Sirius所提到的,
data.table::fread()
无疑是R
最快的csv阅读器,内置的多线程应该充分利用您所拥有的资源。还有一个想法--因为你只需要最终结果中的一部分行,
arrow
包是一个很好的选择,你可以使用arrow的“下推”功能来扫描多文件数据集,只把符合条件的行读入内存,而不是把整个文件读入内存。arrow
partially implements the functionality of dplyr,因此根据您的经验,语法可能已经很熟悉了。%>%
代替4.1引入的基本R管道-|>
。*ndasle7k2#
对于处理大型csv文件,我只使用data. table中的
fread()
。实现你想要的一个方法是:
它产生如下结果:
正如其他人所提到的,您的系统解析csv的速度很可能比存储后端处理csv数据的速度更快,因此在多核上并行可能没有帮助,或者帮助很小。
另外,正如@罗兰在我编辑出我的第一个并行化方法之前指出的那样,fread已经并行化了,不需要再做一次。
您应该继续查看
fread()
读取文件的速度,然后查看从两个不同的R会话同时阅读同一文件的速度是否相同或更慢。t3irkdon3#
您在硬件和数据大小方面的情况似乎不容易为这里的普通用户复制。其他人已经分享了一些很好的见解,但最终无法访问您的硬件和数据,这一切都停留在猜测。
所以你可以做的是运行一些基准测试来优化你的方法,因为不同的工具会更好地工作,这取决于你的设置瓶颈。
测试数据
选择一组具有代表性的数据。在您最初的问题中,数据集/文件相当小,这带来了独特的挑战(例如,并行化会产生一些开销,这可能会扼杀小文件的速度增益,但对于大文件,您可能会看到巨大的改进,因为开销与总运行时间相比可以忽略不计)。我使用您的测试数据,但是稍微增加数据大小,从问题的Angular 来看,增加了数据点的数量。
函数
我推荐的基准测试工具(
bench
)希望比较不同的函数,因此将代码 Package 到每个方法的函数中是有意义的(有一些变通方法,但这种方法更容易)。使用
furrr
进行并行化的原始方法:箭头功能由Matt Summersgill实现。
Sirius
data.table
方法的两个稍微修改的版本:基准测试
我推荐使用优秀的
bench
包进行基准测试,因为它可以在网格中运行函数,生成非常详细的数据,并绘制一些非常好的图表进行比较。如果在网格中搜索,比较结果的最简单方法是使用图:
但您也可以仔细查看单个运行:
我喜欢看第二个,因为您很快就会看到,例如,带有两个文件的my_data_table_native比my_data_table_native快大约7倍。
最突出的是并行化只会在涉及大量文件时改进原始函数,但并行化总是会损害
data.table
的性能。这可能是因为工作人员相互阻碍,而瓶颈是磁盘读取速度。因此,我的结论是,对于这里使用的文件大小,最多可以使用大约50个常规的data.table
方法,否则使用arrow
。然而,你的硬件和我的有很大的不同。在评论中你说并行化实际上提高了处理速度,这可能会指向CPU瓶颈,而我似乎主要受到我机器的SSD读取速度的限制。Matt Summersgill还指出,
data.table
对于较大的文件来说内存使用量很大,这就可以解释为什么你的机器会因为那个通常非常快的包而变慢。最终你需要自己运行基准测试来找出答案。