r—使用sparklyr运行并行函数调用

cclgggtu  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(411)

目前,我正在使用doparallel库中的foreach循环在同一台机器的多个内核上并行运行函数调用,如下所示:

out_results=foreach(i =1:length(some_list))%dopar%
{
   out=functions_call(some_list[[i]])
   return(out)
}

这个列表是一个Dataframe列表,每个Dataframe有不同的列数,函数call()是一个函数,它对数据执行多种操作,例如数据操作,然后使用随机林进行变量选择,最后执行最小二乘拟合。变量out也是一个由3个Dataframe组成的列表,out\u结果将是一个列表。我正在使用cran库和一些由我在函数调用中创建的自定义库,我希望避免使用sparkml库,因为它们的功能有限,并且需要重写整个代码。
我想利用spark并行运行这些函数调用。有可能吗?如果是,我应该往哪个方向想。我已经阅读了很多来自sparkyr的文档,但是似乎没有什么帮助,因为这里提供的示例非常简单。

8xiog9wr

8xiog9wr1#

sparkyr的主页给出了分布在spark集群上的任意r代码的示例。特别是,请参见它们的分组操作示例。
您的主结构应该是一个Dataframe,您将按行处理它。可能类似以下情况(未测试):

some_list = list(tibble(a=1[0]), tibble(b=1), tibble(c=1:2))
all_data = tibble(i = seq_along(some_list), df = some_list)

# Replace this with your actual code.

# Should get one dataframe and produce one dataframe.

# Embedded dataframe columns are OK

transform_one = function(df_wrapped) {
  # in your example, you expect only one record per group
  stopifnot(nrow(df_wrapped)==1)
  df = df_wrapped$df

  res0 = df
  res1 = tibble(x=10)
  res2 = tibble(y=10:11)

  return(tibble(res0 = list(res0), res1 = list(res1), res2 = list(res2)))
}

all_data %>% spark_apply(
  transform_one,
  group_by = c("i"), 
  columns = c("res0"="list", "res1"="list", "res2"="list"),
  packages = c("randomForest", "etc")
)

总而言之,这种方法看起来很不自然,好像我们是在强迫一项不太适合的任务使用spark。也许你应该检查另一个并行化框架?

相关问题