嵌套并行与R未来

ntjbwcob  于 2023-04-09  发布在  其他
关注(0)|答案(1)|浏览(143)

我试图读取多个大的csv文件与嵌套并行与未来。
我有一台有32个内核的机器,我想设置嵌套并行(5乘6),每个进程有6个内核。我试图利用data.table::fread(.., nThreads = 6)的隐式并行。
R包future提供了嵌套并行,我尝试过

library(future)
plan(list(tweak(multisession, workers = 5), tweak(multisession, workers = 6)))

但上面的每个子进程实际上只使用1个内核:

plan(list(tweak(multisession, workers = 5), 
          tweak(multisession, workers = 6)))
registerDoFuture()
foreach(i = 1:5) %dopar%  {
  availableCores()
}

[[1]]
mc.cores 
       1 

[[2]]
mc.cores 
       1 

[[3]]
mc.cores 
       1 

[[4]]
mc.cores 
       1 

[[5]]
mc.cores 
       1

有没有办法做到这一点?

w46czmvw

w46czmvw1#

(Futureverse维护者在这里)
...但上面的实际上每个子进程只使用1个核心:
我明白这里的误解了。你想在这里使用nbrOfWorkers()(from**future)而不是availableCores()(fromparallelly-reexport as-is fromfuture**)。这将给予你你所期望的:

> foreach(i = 1:5) %dopar% {
  nbrOfWorkers()
}
[[1]]
[1] 6
...
[[5]]
[1] 6

availableCores()返回1的原因(1)是因为future框架错误地试图阻止嵌套并行化。它通过设置控制并行工作者和CPU核心数量的选项和环境变量来实现这一点,包括options(mc.cores = 1L)availableCores()正确地拾取了这一点。例如,这可以防止使用y <- mclapply(X, FUN)cl <- makeCluster(avaiableCores()),或者plan(multisession),如果已经在并行worker中运行。相反,nbrOfWorkers()反映了plan()指定的worker数量。在您的示例中,我们在plan(list(tweak(multisession, workers = 5), tweak(multisession, workers = 6)))的第二个级别的并行worker中设置了plan(multisession, workers = 6)
为了让自己相信您确实在与您的设置并行运行,您可以采用https://future.futureverse.org/articles/future-3-topologies.html中的一个示例。
现在,并行 threads 和并行 processes(也就是parallel worker)是不一样的。你可以把 threads 看作是一种低得多的并行化机制。重要的是,future框架并不 * 限制并行worker中使用的 threads 的数量,包括data.table使用的并行线程的数量。因此,你需要显式调用:

data <- data.table::fread(.., nThreads = 6)

或者,如果你想灵活地适应当前的设置,

data <- data.table::fread(.., nThreads = nbrOfWorkers())

避免过度并行,或者您可以将data.table配置为:

## Set the number of parallel threads used by 'data.table'
## (the default is to use all physical CPU cores)
data.table::setDTthreads(nbrOfWorkers())
data <- data.table::fread(..;)

顺便说一句,在**doFuture**(〉= 1.0.0)中,如果用%dofuture%替换%dopar%,则不再需要registerDoFuture()。因此,并行阅读大量CSV文件的要点是:

library(doFuture)
plan(list(tweak(multisession, workers = 5), 
          tweak(multisession, workers = 6)))

files <- dir(pattern = "*.csv$")
res <- foreach(file = files) %dofuture% {
  data.table::setDTthreads(nbrOfWorkers())
  data.table::fread(file)
}

尽管如此,请注意,您的瓶颈可能是文件系统而不是CPU。当您并行阅读文件时,您可能会压倒文件系统,最终会减慢文件读取速度,而不是加快速度。有时候并行读取两到三个文件会更快,但更多的话就会适得其反。因此,您需要使用不同数量的并行工作器进行基准测试。
此外,现在,有一些R包高度专门用于高效地将数据文件读取到R中。其中一些包支持高效地读取多个文件。vroom包就是这样一个例子。

相关问题