如何使用R以块的形式并行读取多个大型CSV?

qmb5sa22  于 2023-03-15  发布在  其他
关注(0)|答案(3)|浏览(125)

我有10个非常大的CSV文件(可能具有相同的头,也可能不具有相同的头),我使用“readr”包read_csv_chunked以块的形式连续阅读和处理这些文件().目前,我能够使用10个核心并行读取10个文件。这个过程仍然需要一个小时。我有128个核心。我是否可以将每个CSV拆分为10个块,以便为每个文件并行处理,从而使用100个内核?以下是我目前拥有的(创建两个示例文件仅用于测试):

library(doParallel)
library(foreach)

# Create a list of two sample CSV files and a filter by file
df_1 <- data.frame(matrix(sample(1:300), ncol = 3))
df_2 <- data.frame(matrix(sample(1:200), ncol = 4))
filter_by_df <- data.frame(X1 = 1:100)

write.csv(df_1, "df_1.csv", row.names = FALSE)
write.csv(df_2, "df_2.csv", row.names = FALSE)

files <- c("df_1.csv", "df_2.csv")

# Create a function to read and filter each file in chunks
my_function <-
  function(file) {
    library(dplyr)
    library(plyr)
    library(readr)
    
    filter_df <-
      function(x, pos) {
        subset(x, X1 %in% filter_by_df$X1 | X2 %in% filter_by_df$X1)
      }
    
    readr_df <-
      read_csv_chunked(file,
                       callback = DataFrameCallback$new(filter_df),
                       progress = F,
                       chunk_size = 50) %>%
      as.data.frame() %>%
      distinct()
    return(readr_df)
  }

# Apply the custom function created above to all files in parallel and combine them
df_foreach <-
  foreach(i = files, .combine = rbind.fill, .packages = c("plyr")) %dopar% my_function(i)

我正在研究嵌套foreach(),但不确定如何以嵌套方式传递不同的函数(read_csv_chunked with .合并= rbind和我的自定义函数filter_df()with .combine = rbinf.fill())。我还研究了“future”包。任何建议都非常感谢。谢谢。

2ic8powd

2ic8powd1#

正如Sirius所提到的,data.table::fread()无疑是R最快的csv阅读器,内置的多线程应该充分利用您所拥有的资源。
还有一个想法--因为你只需要最终结果中的一部分行,arrow包是一个很好的选择,你可以使用arrow的“下推”功能来扫描多文件数据集,只把符合条件的行读入内存,而不是把整个文件读入内存。
arrowpartially implements the functionality of dplyr,因此根据您的经验,语法可能已经很熟悉了。

library(arrow)
library(dplyr)

DS <- arrow::open_dataset(sources = c("df_1.csv","df_1.csv"),
                          format = "csv")

DS |> 
  filter(X1 %in% filter_by_df[["X1"]]| X2 %in% filter_by_df[["X1"]] ) |> 
  distinct() |> 
  collect()
  • 请注意-此答案假设R版本为4.1或更高版本。对于早期的R版本,可以使用magrittr管道%>%代替4.1引入的基本R管道-|>。*
ndasle7k

ndasle7k2#

对于处理大型csv文件,我只使用data. table中的fread()
实现你想要的一个方法是:

library(data.table)
library(purrr)

# Create a list of two sample CSV files and a filter by file
set.seed(100)
df_1 <- data.frame(matrix(sample(1:300), ncol = 3))
df_2 <- data.frame(matrix(sample(1:200), ncol = 4))
filter_by_df <- data.frame(X1 = 1:100)

write.csv(df_1, "df_1.csv", row.names = FALSE)
write.csv(df_2, "df_2.csv", row.names = FALSE)

files <- c("df_1.csv", "df_2.csv")

filedata <- map(
    files,
    ~{
        d <- fread(.x)
        d[ X1 %in% filter_by_df$X1 | X2 %in% filter_by_df$X1 ]
    }
)

df_final <- rbindlist( filedata, fill=TRUE )

head( df_final )

它产生如下结果:

> head( df_final )
    X1  X2  X3 X4
1: 206 100  54 NA
2:   4 276 144 NA
3:  98  27 205 NA
4:   7 159 134 NA
5: 146  71 180 NA
6: 258  39 176 NA

正如其他人所提到的,您的系统解析csv的速度很可能比存储后端处理csv数据的速度更快,因此在多核上并行可能没有帮助,或者帮助很小。
另外,正如@罗兰在我编辑出我的第一个并行化方法之前指出的那样,fread已经并行化了,不需要再做一次。
您应该继续查看fread()读取文件的速度,然后查看从两个不同的R会话同时阅读同一文件的速度是否相同或更慢。

t3irkdon

t3irkdon3#

您在硬件和数据大小方面的情况似乎不容易为这里的普通用户复制。其他人已经分享了一些很好的见解,但最终无法访问您的硬件和数据,这一切都停留在猜测。
所以你可以做的是运行一些基准测试来优化你的方法,因为不同的工具会更好地工作,这取决于你的设置瓶颈。

测试数据

选择一组具有代表性的数据。在您最初的问题中,数据集/文件相当小,这带来了独特的挑战(例如,并行化会产生一些开销,这可能会扼杀小文件的速度增益,但对于大文件,您可能会看到巨大的改进,因为开销与总运行时间相比可以忽略不计)。我使用您的测试数据,但是稍微增加数据大小,从问题的Angular 来看,增加了数据点的数量。

df_1 <- data.frame(matrix(sample(1:30000), ncol = 3))
df_2 <- data.frame(matrix(sample(1:20000), ncol = 4))
filter_by_df <- data.frame(X1 = 1:100)

write.csv(df_1, "df_1.csv", row.names = FALSE)
write.csv(df_2, "df_2.csv", row.names = FALSE)

函数

我推荐的基准测试工具(bench)希望比较不同的函数,因此将代码 Package 到每个方法的函数中是有意义的(有一些变通方法,但这种方法更容易)。
使用furrr进行并行化的原始方法:

library(dplyr)
library(readr)
library(furrr)

filter_df <- function(x, pos) {
  subset(x, X1 %in% filter_by_df$X1 | X2 %in% filter_by_df$X1)
}

my_function <- function(files, cores) {
  plan(multisession, workers = cores)
  future_map_dfr(files, function(file) {
    suppressMessages(read_csv_chunked(file,
                                      callback = DataFrameCallback$new(filter_df),
                                      progress = FALSE,
                                      chunk_size = 50)) %>%
      as.data.frame() %>%
      distinct()
  })
}

箭头功能由Matt Summersgill实现。

library(arrow)

my_arrow <- function(files, cores) { # cores are ignored, just for comparability
  
  DS <- arrow::open_dataset(sources = files,
                            format = "csv")
  
  DS |> 
    filter(X1 %in% filter_by_df[["X1"]]| X2 %in% filter_by_df[["X1"]] ) |> 
    distinct() |> 
    collect()
}

Siriusdata.table方法的两个稍微修改的版本:

library(data.table)
my_data_table_native <- function(files, cores) {
  setDTthreads(cores)
  df_out <- data.table::rbindlist(lapply(files, data.table::fread), fill = TRUE)[ X1 %in% filter_by_df$X1 | X2 %in% filter_by_df$X1 ]
}

my_data_table_furrr <- function(files, cores) {
  setDTthreads(cores)
  plan(multisession, workers = cores)
  future_map_dfr(files, function(file) {
    data.table::fread(file)[ X1 %in% filter_by_df$X1 | X2 %in% filter_by_df$X1 ]
  })
}

基准测试

我推荐使用优秀的bench包进行基准测试,因为它可以在网格中运行函数,生成非常详细的数据,并绘制一些非常好的图表进行比较。

  • 不同数量的文件会发生什么
  • 平行化有什么不同
results <- bench::press(
  n_files = c(2, 5, 10, 25, 50, 100, 200, 1000), # test different resamplings
  n_cores = 1:4, # test different numbers of cores
  {
    bench_sample <- sample(c("df_1.csv", "df_2.csv"), n_files, replace = TRUE)
    bench::mark(
      my_function = my_function(bench_sample, n_cores),
      my_arrow = my_arrow(bench_sample, n_cores),
      my_data_table_native = my_data_table_native(bench_sample, n_cores),
      my_data_table_furrr = my_data_table_furrr(bench_sample, n_cores),
      iterations = 5L, # rerun each combination 5 times
      check = FALSE # usually bench checks if the outcome is 100% identical, but the approaches differ in insubstantial ways
    )
  }
)

如果在网格中搜索,比较结果的最简单方法是使用图:

ggplot2::autoplot(results)

但您也可以仔细查看单个运行:

summary(results)
Warning: Some expressions had a GC in every iteration; so filtering is
disabled.

# A tibble: 128 × 8
   expression           n_files n_cores      min   median `itr/sec` mem_alloc
   <bch:expr>             <dbl>   <int> <bch:tm> <bch:tm>     <dbl> <bch:byt>
 1 my_function                2       1 125.47ms 125.71ms      7.46   11.79MB
 2 my_arrow                   2       1  47.39ms  50.93ms     18.3    15.85MB
 3 my_data_table_native       2       1   2.61ms   2.68ms    197.      3.21MB
 4 my_data_table_furrr        2       1  17.95ms  18.27ms     51.2     1.28MB
 5 my_function                5       1 256.08ms 256.81ms      3.87    8.11MB
 6 my_arrow                   5       1  42.06ms   43.8ms     22.2   167.66KB
 7 my_data_table_native       5       1   5.76ms   6.19ms    166.      2.51MB
 8 my_data_table_furrr        5       1  22.44ms   22.6ms     42.4     2.37MB
 9 my_function               10       1 402.88ms 409.96ms      2.44   12.06MB
10 my_arrow                  10       1  47.22ms  47.88ms     20.0   168.19KB
# … with 118 more rows, and 1 more variable: `gc/sec` <dbl>
summary(results, relative = TRUE)
Warning: Some expressions had a GC in every iteration; so filtering is
disabled.

# A tibble: 128 × 8
   expression           n_files n_cores    min median `itr/sec` mem_al…¹ gc/se…²
   <bch:expr>             <dbl>   <int>  <dbl>  <dbl>     <dbl>    <dbl>   <dbl>
 1 my_function                2       1  53.5   53.0       323.    72.2      Inf
 2 my_arrow                   2       1  20.2   21.5       791.    97.1      Inf
 3 my_data_table_native       2       1   1.11   1.13     8505.    19.7      Inf
 4 my_data_table_furrr        2       1   7.66   7.71     2216.     7.84     NaN
 5 my_function                5       1 109.   108.        168.    49.7      Inf
 6 my_arrow                   5       1  17.9   18.5       960.     1.00     Inf
 7 my_data_table_native       5       1   2.46   2.61     7166.    15.4      NaN
 8 my_data_table_furrr        5       1   9.57   9.53     1833.    14.5      Inf
 9 my_function               10       1 172.   173.        105.    73.9      Inf
10 my_arrow                  10       1  20.1   20.2       863.     1.01     Inf
# … with 118 more rows, and abbreviated variable names ¹​mem_alloc, ²​`gc/sec`

我喜欢看第二个,因为您很快就会看到,例如,带有两个文件的my_data_table_native比my_data_table_native快大约7倍。
最突出的是并行化只会在涉及大量文件时改进原始函数,但并行化总是会损害data.table的性能。这可能是因为工作人员相互阻碍,而瓶颈是磁盘读取速度。因此,我的结论是,对于这里使用的文件大小,最多可以使用大约50个常规的data.table方法,否则使用arrow
然而,你的硬件和我的有很大的不同。在评论中你说并行化实际上提高了处理速度,这可能会指向CPU瓶颈,而我似乎主要受到我机器的SSD读取速度的限制。Matt Summersgill还指出,data.table对于较大的文件来说内存使用量很大,这就可以解释为什么你的机器会因为那个通常非常快的包而变慢。最终你需要自己运行基准测试来找出答案。

相关问题