R语言 是否可以使用箭头按列而不是按列值对数据集进行分区?

lstz6jyr  于 2023-05-20  发布在  其他
关注(0)|答案(1)|浏览(102)

我有一个函数,它创建了一个data.table,其中包含大约2900万行和用户定义的基于输入样本列表的列数。它读取带有索引列的单个示例文件,并将它们逐列连接到主索引列,以创建这个大data.table
| 指数|样品1|样品2|
| --------------|--------------|--------------|
| 1| 1| 2|
| 2| 3|四|
| ......这是什么?|......这是什么?|......这是什么?|
| 29米|5|六|
由于一次完成这些操作会占用大量内存,所以我想一次读入几个文件,将它们加入索引,将它们作为parquet文件写入磁盘,然后从内存中清除它们。从文档中,我无法判断这是否可能,或者是否只有在完整的数据集存在并且只有列中的值而不是列名本身时才可以对文件进行分区。做一些像

arrow::write_dataset(DT, partitioning = c("sample1", "sample2", ...)

给出了一个目录,该目录中的子目录数与每列中的值数相同。
我尝试过将parquet写入一个目录的方法,但是将它们读回来很困难,因为我不知道如何读取多个parquet文件并按列连接它们。我在这里问了另一个问题:How to write multiple arrow/parquet files in chunks while reading in large data quantities of data so that all written files are one dataset?
如果我有10个示例文件,并希望将它们分成5个一组,我希望创建一个包含2个parquet文件的目录,每个文件都有索引列和5个示例组。

part-1.parquet: index, sample1, sample2, sample3, sample4, sample5
part-2.parquet: index, sample6, sample7, sample8, sample9, sample10

我也可以有三个文件-一个索引Parquet,一个Parquet与前5个样本,另一个与最后5个样本-如果列重复是不好的做法。

index.parquet: index
part-1.parquet: sample1, sample2, sample3, sample4, sample5
part-2.parquet: sample6, sample7, sample8, sample9, sample10
nr9pn0ug

nr9pn0ug1#

如果我理解正确的话:

  • 箭头进行分区的“正常”方式是按的组,假设它们都共享相同的列;而
  • 你想读取多个 parquet 文件,并将它们添加为

我认为最好的方法是做一个懒惰的读取(有或没有过滤)和Reduceleft_join在任何列是一个id字段(必须存在于所有)。
(Side注意:我不认为这可以通过盲目地使用cbindbind_cols来完成,但即使我们可以,如果在没有其他文件的情况下更新任何文件,也会有点不安全。
可重复的例子。

dir.create("/tmp/pqdir")
mt <- as.data.table(mtcars)
mt[, id := .I]
setcolorder(mt, "id")
for (nm in names(mt)[-1]) arrow::write_parquet(mt[, .SD, .SDcols=c("id",nm)], file.path("/tmp/pqdir/", paste0(nm, ".pq")))

此时,我们有一组parquet文件,其中每个文件都有id和另一列。事实上,它总是“另一列”是完全巧合,这种方法的工作原理,以及如果一个 parquet 文件有100和其他有不同的数字。唯一真正必要的是它们共享一个共同的id来合并。
为了清楚起见,有两个这样的文件:

library(dplyr)
arrow::open_dataset("/tmp/pqdir/am.pq") %>%
  head(3) %>%
  collect()
#       id    am
#    <int> <num>
# 1:     1     1
# 2:     2     1
# 3:     3     1
arrow::open_dataset("/tmp/pqdir/cyl.pq") %>%
  head(3) %>%
  collect()
#       id   cyl
#    <int> <num>
# 1:     1     6
# 2:     2     6
# 3:     3     4

我应该说我使用dplyr来演示这一点。虽然我知道你说你正在使用data.table,但dplyr的添加允许我们进行“懒惰”拉取,只在最后引入数据。
我建议我们懒洋洋地打开每个 parquet 文件,并懒洋洋地将它们连接在一起。我将在base::Reduce中使用dplyr::full_join的组合(我猜purrr::reduce也可以工作)。

combined <- lapply(list.files("/tmp/pqdir", full.names=TRUE)[c(1,3,5,6)], 
                   arrow::open_dataset) |>
  Reduce(function(prev, this) full_join(prev, this, by = "id"), x = _)
combined
# FileSystemDataset (query)
# id: int32 (coalesce(id.x, id.y))
# am: double
# cyl: double
# drat: double
# gear: double
# See $.data for the source Arrow object

我之所以没有使用collect,只是为了说明,虽然我们已经访问了四个(任意选择的)文件并启动了连接过程,但我们还没有提取数据,因为您要处理数百万行,所以保留了一些回旋余地。
从这里,我们可以collect

combined %>%
  collect()
#        id    am   cyl  drat  gear
#     <int> <num> <num> <num> <num>
#  1:     1     1     6  3.90     4
#  2:     2     1     6  3.90     4
#  3:     3     1     4  3.85     4
#  4:     4     0     6  3.08     3
#  5:     5     0     8  3.15     3
#  6:     6     0     6  2.76     3
#  7:     7     0     8  3.21     3
#  8:     8     0     4  3.69     4
#  9:     9     0     4  3.92     4
# 10:    10     0     6  3.92     4
# ---                              
# 23:    23     0     8  3.15     3
# 24:    24     0     8  3.73     3
# 25:    25     0     8  3.08     3
# 26:    26     1     4  4.08     4
# 27:    27     1     4  4.43     5
# 28:    28     1     4  3.77     5
# 29:    29     1     8  4.22     5
# 30:    30     1     6  3.62     5
# 31:    31     1     8  3.54     5
# 32:    32     1     4  4.11     4

这样做的最大缺点是每次加载数据时都要进行多次连接;这可能是你关心的问题,也可能不是
您可以通过以下方式更新此流程:

  • filter每个 parquet 文件单独或全部使用相同的过滤(也许在id?);及
  • 使用其他*_join动词之一,以防您的筛选对于您打开的每个文件中存在的任何列都是唯一的。

我只装了四个,你可以根据需要选择装得多或少。
我建议您大部分时间要使用的列应该放在一个parquet文件中,并将其他列作为“选项”加入。关于上下文内存管理:-)

相关问题