我试图读取多个大的csv文件与嵌套并行与未来。
我有一台有32个内核的机器,我想设置嵌套并行(5乘6),每个进程有6个内核。我试图利用data.table::fread(.., nThreads = 6)
的隐式并行。
R包future
提供了嵌套并行,我尝试过
library(future)
plan(list(tweak(multisession, workers = 5), tweak(multisession, workers = 6)))
但上面的每个子进程实际上只使用1个内核:
plan(list(tweak(multisession, workers = 5),
tweak(multisession, workers = 6)))
registerDoFuture()
foreach(i = 1:5) %dopar% {
availableCores()
}
[[1]]
mc.cores
1
[[2]]
mc.cores
1
[[3]]
mc.cores
1
[[4]]
mc.cores
1
[[5]]
mc.cores
1
有没有办法做到这一点?
1条答案
按热度按时间w46czmvw1#
(Futureverse维护者在这里)
...但上面的实际上每个子进程只使用1个核心:
我明白这里的误解了。你想在这里使用
nbrOfWorkers()
(from**future)而不是availableCores()
(fromparallelly-reexport as-is fromfuture**)。这将给予你你所期望的:availableCores()
返回1的原因(1)是因为future框架错误地试图阻止嵌套并行化。它通过设置控制并行工作者和CPU核心数量的选项和环境变量来实现这一点,包括options(mc.cores = 1L)
。availableCores()
正确地拾取了这一点。例如,这可以防止使用y <- mclapply(X, FUN)
,cl <- makeCluster(avaiableCores())
,或者plan(multisession)
,如果已经在并行worker中运行。相反,nbrOfWorkers()
反映了plan()
指定的worker数量。在您的示例中,我们在plan(list(tweak(multisession, workers = 5), tweak(multisession, workers = 6)))
的第二个级别的并行worker中设置了plan(multisession, workers = 6)
。为了让自己相信您确实在与您的设置并行运行,您可以采用https://future.futureverse.org/articles/future-3-topologies.html中的一个示例。
现在,并行 threads 和并行 processes(也就是parallel worker)是不一样的。你可以把 threads 看作是一种低得多的并行化机制。重要的是,future框架并不 * 限制并行worker中使用的 threads 的数量,包括data.table使用的并行线程的数量。因此,你需要显式调用:
或者,如果你想灵活地适应当前的设置,
避免过度并行,或者您可以将data.table配置为:
顺便说一句,在**doFuture**(〉= 1.0.0)中,如果用
%dofuture%
替换%dopar%
,则不再需要registerDoFuture()
。因此,并行阅读大量CSV文件的要点是:尽管如此,请注意,您的瓶颈可能是文件系统而不是CPU。当您并行阅读文件时,您可能会压倒文件系统,最终会减慢文件读取速度,而不是加快速度。有时候并行读取两到三个文件会更快,但更多的话就会适得其反。因此,您需要使用不同数量的并行工作器进行基准测试。
此外,现在,有一些R包高度专门用于高效地将数据文件读取到R中。其中一些包支持高效地读取多个文件。vroom包就是这样一个例子。