如何在R中并行化这个lapply()函数?

qni6mghb  于 2023-02-06  发布在  其他
关注(0)|答案(1)|浏览(167)

我尝试执行网格搜索,以找到使x和y的线性组合之间的相关性最大化的系数。我的函数采用数据框架,其中每列是要测试该迭代的θ。

corr_grid_search <- function(thetas, modeling_df) {

    # thetas = as.vector(thetas)
    coeff1 = modeling_df$penalty1 / thetas[1]
    coeff2 = modeling_df$penalty2 / thetas[2]
    coeff3 = modeling_df$penalty3 / thetas[3]
    coeff4 = modeling_df$penalty4 / thetas[4]
    coeff5 = modeling_df$penalty5 / thetas[5]
    coeff6 = modeling_df$penalty6 / thetas[6]
    coeff7 = modeling_df$penalty7 / thetas[7]
    coeff8 = modeling_df$penalty8 / thetas[8]
    coeff9 = modeling_df$penalty9 / thetas[9]
    coeff10 = modeling_df$penalty10 / thetas[10]

    df = data.frame(coeff1, coeff2, coeff3, coeff4, coeff5, coeff6, coeff7, coeff8, coeff9, coeff10)

    pp_1 = modeling_df$x1 / df$coeff1
    pp_2 = modeling_df$x2 / df$coeff2
    pp_3 = modeling_df$x3 / df$coeff3
    pp_4 = modeling_df$x4 / df$coeff4
    pp_5 = modeling_df$x5 / df$coeff5
    pp_6 = modeling_df$x6 / df$coeff6
    pp_7 = modeling_df$x7 / df$coeff7
    pp_8 = modeling_df$x8 / df$coeff8
    pp_9 = modeling_df$x9 / df$coeff9
    pp_10 = modeling_df$x10 / df$coeff10

    recip = 1/df[, c('coeff1', 'coeff2', 'coeff3', 
    'coeff4', 'coeff5', 'coeff6', 
    'coeff7', 'coeff8', 'coeff9', 'coeff10')]
    recip = as.data.frame(lapply(recip, function(x) replace(x, is.infinite(x), NA)))

    df = data.frame(pp_1, pp_2, pp_3, pp_4, pp_5, pp_6, pp_7, 
        pp_8, pp_9, pp_10)

    weighted_x = rowSums(df, na.rm=T) / 
        rowSums(recip, na.rm=T)

    cor(weighted_x[!is.na(weighted_x)], 
           modeling_df[!is.na(weighted_x),]$y)
}

我用lapply()运行它,如下所示:

lapply(blah, corr_grid_search, modeling_df)

但是我试图并行化它,遇到了麻烦。我尝试过的两种方法使用了parallel和future. apply库,但都没有工作:
一个二个一个一个
这两种方法都出了问题,因为它们耗时长得可怕,慢了2 - 3个数量级。我在这里做错了什么?

ycl3bljg

ycl3bljg1#

并行处理并不总是比单线程快

在我的经验中,有两种常见的情况是并行处理比使用单线程慢:
1.数据量很大,在工作人员之间复制数据的成本很高。
1.分配给每个工作线程的任务足够快,以至于设置工作线程的开销会产生显著影响。

1.在工作线程之间复制数据所花费的时间比并行处理节省的时间要多

furrrdocs所示:
记住数据必须在工作者之间来回传递是很重要的。这意味着无论您从并行化中获得了什么性能提升,都可能被移动大量数据所粉碎。例如,如果您要将大型 Dataframe 移动到工作者,并行运行模型,并返回大型模型对象,那么数据的移动可能会占用大量时间。

快速模拟

如果我们定义一个低效函数,就可以看到这样的例子。此函数计算数据框列的平均值,但不是返回单个值,而是创建一个循环使用该值的新数据框列。然后返回附加了此新列的整个数据框:

silly_fun <- function(dat, col_name) {
    mean_col_name <- paste0(col_name, "_mean")
    dat[[mean_col_name]] <- mean(dat[[col_name]])

    return(dat)
}

对 Dataframe 的每一列都运行此函数是个坏主意,无论是否并行。
所以,让我们尝试一下只有100行和100列的情况,看看会发生什么:

library(future.apply)
plan(multisession)

nrows <- 100
ncols <- 100
dat <- data.frame(
    matrix(rnorm(nrows * ncols), nrows, ncols)
)

res <- microbenchmark::microbenchmark(
    single_thread = lapply(names(dat), \(col_name) silly_fun(dat, col_name)),
    parallel = future_lapply(names(dat), \(col_name) silly_fun(dat, col_name)),
    times = 100
)

输出:

Unit: milliseconds
          expr      min        lq       mean    median        uq      max neval cld
 single_thread   2.9771   3.26725   3.876938   3.43705   3.91215   9.6273   100  a 
      parallel 103.5295 114.23415 126.105709 123.41755 132.39925 235.1055   100   b

如您所见,并行作业的中值大约是单线程作业的35倍。
如果我们用1000行和1000列来尝试,结果如下:

Unit: milliseconds
          expr        min         lq       mean     median         uq        max neval
 single_thread   168.5477   168.5477   168.5477   168.5477   168.5477   168.5477     1
      parallel 29440.3962 29440.3962 29440.3962 29440.3962 29440.3962 29440.3962     1

在这里,并行运行需要175倍的时间,您可以看到我在这里只做了一次迭代-单线程迭代需要0.168秒,而并行运行需要29.4秒!

2.设置工作线程的开销可能比节省的时间更昂贵

让我们举一个更合理的例子,只返回实际的均值,而不是整个 Dataframe ,包含1000行和列。你可能会认为,现在我们没有传递大量的数据,并行处理会更快。让我们看看:

nrows <- 1e3
ncols <- 1e3

dat <- data.frame(
    matrix(rnorm(nrows * ncols), nrows, ncols)
)
sensible <- microbenchmark::microbenchmark(
    single_thread = lapply(dat, mean),
    parallel = future_lapply(dat, mean),
    times = 10
)

Unit: milliseconds
          expr      min        lq       mean    median        uq      max neval cld
 single_thread   4.3159   4.65055   5.245647   4.88995   5.37955  10.3636   100  a 
      parallel 157.9709 163.17605 177.565840 169.55155 180.03720 513.5421   100   b

这两种方法在绝对时间上都表现得更好(两种情况下,1000行和1000列看起来就像使用愚蠢的方法时各自的100行和1000列)。
但是使用单个线程仍然要快得多,因为与相对较快的计算平均值的操作相比,设置工作线程的开销很高。
总的来说,并行作业可能不会更快,但这并不意味着代码没有按预期创建子进程。并行处理有成本也有好处。在适当的情况下,它会快很多,但如果您要复制大型数据集或创建进程来执行非常简单的计算,它可能会比单线程方法慢。

相关问题