将SQLite表导出到Apache parquet而不创建 Dataframe

3ks5zfa0  于 2023-02-09  发布在  SQLite
关注(0)|答案(1)|浏览(194)

我有多个巨大的CSV文件,我必须基于Apache Parquet导出,并基于多个条件/键(=列值)将它们拆分为更小的文件。据我所知,Apache arrow是R包,允许使用Apache parquet文件。
我在一个共享的实验室环境中工作,考虑到有限的RAM内存(与在同一环境中同时工作的用户数量相比),建议我们在本地SQLite数据库中创建 Dataframe ,而不是将它们导入内存(RAM)。
下面的伪代码显示了如何将CSV文件导入到本地SQLite数据库中,在下面的代码中我使用了sqldftidyverse包。

input_file_path <- "D:/tmp/mydata.csv"
db_file_path <- "D:/tmp/db_tmp_sqlite.db"
unlink(db_file_path)
sqldf(str_c("attach '", db_file_path, "' as new"))
sqldf(read.csv.sql(
    file = input_file_path,
    sql = "
        create table mytable as
        select
            . . .
        from
            file
    ",
    `field.types` = list(
      . . .
    ),
    ##
    header = TRUE,
    sep = ",",
    eol = "\n",
    dbname = db_file_path,
    drv = "SQLite"
))

这和预期的一样好用,我的表被创建,我可以运行所有需要的SQL查询,特别是添加补充变量(我的表中的列),这些变量稍后将用作将我的表导出为Apache Parquet格式的键。但是,基于Apache Arrow for R Cheatsheet,允许基于Apache Parquet导出我的数据的函数write_dataset需要一个dataframe
这正是我的问题,因为R中的 Dataframe 在内存中,而我前面解释的数据在SQLite本地数据库中,这意味着首先我必须执行SELECT将整个数据导出到RAM中,类似于

df <- sqldf("select * from mytable", dbname = ...)

只有这样,我才能使用write_dataset,并将创建的df Dataframe 作为其第一个参数,以便基于Apache Parquet导出和拆分数据。但这不是我想要做的。考虑到共享环境中现有的资源限制(内存不足),关键是将数据放在SQLite中,而不是放在内存(RAM)中。
有没有办法在R程序中直接从SQLite转换到Apache Parquet,而不在导出之前先将整个数据放入 Dataframe 中,或者我正在尝试做一些根本不可能的事情?

brvekthn

brvekthn1#

DuckDB有几个很棒的特性,包括能够 * 本机 * 导入和导出CSV和 parquet 格式,而不会影响R内存。

TL; DR

con <- DBI::dbConnect(duckdb::duckdb(), dbdir = ":memory:")
DBI::dbExecute(con, "copy (select * from read_csv_auto('quux.csv', SAMPLE_SIZE=-1)) to 'quux3.pq' (format parquet)")

仅此而已。数据永远不会导入到R中。(现在,DuckDB是否可以在不耗尽内存的情况下自己完成是我没有在本地验证的另一个问题...)

  • 买者须知 *:然而,在你盲目地相信这一点之前,我强烈建议你做一些类的验证。2大部分可以用duckdb以一种"懒惰"的方式轻松地完成,而不必将整个框架加载到R中。3我鼓励你阅读更多关于原生查询CSV/parquet文件的文档(不必加载到R中)。

方法

为了比较这两种方法(通过data.frame(您不想这样做)和通过duckdb),我们将使用"RSS"(来自ps::ps_memory_info())来指示当前R进程内存使用情况。

* 'rss': "Resident Set Size", this is the non-swapped physical
          memory a process has used (bytes). On UNIX it matches "top"‘s
          'RES' column (see doc). On Windows this is an alias for
          'wset' field and it matches "Memory" column of 'taskmgr.exe'.

尽管这是对R的真实影响的不完美度量,但它确实表明使用DuckDB时对R的影响要小得多。
另外,每个方法都是在R --vanilla的一个新示例中完成的,没有加载.Rprofile或site-init文件,您看到的代码就是执行的代码,仅此而已。

通过 Dataframe 输入R

Sys.getpid()
# [1] 20860

file.info("quux.csv")["size"] / (1024^2) # MBs
#              size
# quux.csv 299.3079
mem1 <- ps::ps_memory_info()["rss"]
dat <- read.csv("quux.csv")
mem2 <- ps::ps_memory_info()["rss"]
arrow::write_parquet(dat, "quux1.pq")
mem3 <- ps::ps_memory_info()["rss"]
c(mem1, mem2, mem3, diff = mem3 - mem1) / (1024^2)
#        rss        rss        rss   diff.rss 
#   57.70703 1218.55859 1548.54688 1490.83984

这表示在读取全部数据后,R增加了1490MB *(仅供参考,data.table::fread代替read.csv只增加了408MB内存,同样的情况。不过,我并不想优化这部分:-)
(FYI,这些数字在不同的运行中会有所不同,而且可能会因本答案范围之外的其他因素而有所不同。我的笔记本电脑有64GB内存,可能无法与您所看到的完全相比。)

DuckDB,从CSV读取,写入 parquet

Sys.getpid()
# [1] 32485

mem1 <- ps::ps_memory_info()["rss"]
con <- DBI::dbConnect(duckdb::duckdb(), dbdir = ":memory:")
DBI::dbExecute(con, "copy (select * from read_csv_auto('quux.csv')) to 'quux2.pq' (format parquet)")
# [1] 1000207
mem2 <- ps::ps_memory_info()["rss"]
c(mem1, mem2, diff=mem2 - mem1) / (1024^2)
#      rss      rss diff.rss 
# 63.23828 86.35938 23.12109

显示在此过程中只有23MB。

比较结果文件。

file.info(list.files(pattern = "quux.*"))["size"] /  (1024^2)
#               size
# quux.csv 299.30786
# quux1.pq  51.69008
# quux2.pq  66.84857

较大的文件是由于下面提到的类中的差异。我的猜测是,如果我们 * 强制 * 一些character列为logical,那么它的文件大小可能会减小。
更深入地了解一下内容:

ds1 <- arrow::open_dataset("quux1.pq")
ds2 <- arrow::open_dataset("quux2.pq")
identical(names(ds1), names(ds2))
# [1] TRUE

data.frame(
  ds1 = sapply(head(ds1, 1), function(z) class(z)[1]),
  ds2 = sapply(head(ds2, 1), function(z) class(z)[1])
)
#           ds1       ds2
# V1  character character
# V2    integer   integer
# V3  character character
# V4    integer   integer
# V5    logical character
# V6    integer   integer
# V7  character   POSIXct
# V8    logical character
# V9    numeric   numeric
# V10   numeric   numeric
# V11   numeric   integer
# V12   integer   integer
# V13   integer   integer
# V14   integer   integer
# V15   numeric   numeric
# V16   integer   integer
# V17   integer   integer
# V18   numeric   numeric
# V19   numeric   numeric
# V20   logical character
# V21   numeric   numeric
# V22   numeric   numeric
# V23   numeric   numeric
# V24   integer   integer
# V25   logical character
# V26   integer   integer
# V27   integer   integer
# V28   integer   integer
# V29   integer   integer
# V30   logical character
# V31   logical character
# V32   numeric   numeric
# V33   logical character
# V34   logical character
# V35   logical character
# V36   logical character
# V37   logical character
# V38   logical character
# V39 character   POSIXct
# V40   logical character
# V41   logical character
# V42   numeric   integer
# V43   logical character
# V44   logical character
# V45   logical character
# V46   logical character
# V47   numeric   numeric
# V48   logical character
# V49   logical character
# V50   logical character
# V51   logical character
# V52   logical character
# V53   logical character
# V54   logical character
# V55   logical character
# V56   logical character
# V57   logical character

一些有趣的事情推断出:

  • 两个字段是时间戳,duckdb方法正确地识别它们,解析它们,并存储为数字时间戳;因为我没有明确地告诉R列类,所以它们默认为character;
  • ds1中的logical列和ds2中的character列都为空(抱歉,这是我的数据);它们是不同的类这一事实表明duckdb默认为字符串形式的null而不是"bit",这可能是也可能不是您的考虑因素;
  • 只有两个柱被分类为numeric-和-integer;V11是真正的整数,这没问题;第二个V42显示用于区分numericinteger的启发式算法遗漏了一些东西。V42中包含小数部分的第一行位于第37159行。

修复数据差异

V42表明我们需要非常清楚地知道什么是进入和离开那个 parquet 生成器的。我猜这是在"CSV导入"步骤中,所以查看CSV Loading表明需要更改SAMPLE_SIZE。虽然效率相对较低,我将使用-1来指示它需要查看列中的所有值来确定其类。
验证该假设:

> str(DBI::dbGetQuery(con, "select * from read_csv_auto('quux.csv') limit 5")[c("V11","V42")])
'data.frame':   5 obs. of  2 variables:
 $ V11: int  4407 4408 4408 4407 4408
 $ V42: int  26 25 23 21 3
> str(DBI::dbGetQuery(con, "select * from read_csv_auto('quux.csv', SAMPLE_SIZE=-1) limit 5")[c("V11","V42")])
'data.frame':   5 obs. of  2 variables:
 $ V11: int  4407 4408 4408 4407 4408
 $ V42: num  26 25 23 21 3

的确,V11仍然很好,V42int变成了num
在使用这个新参数重新运行之后,

DBI::dbExecute(con, "copy (select * from read_csv_auto('quux.csv', SAMPLE_SIZE=-1)) to 'quux3.pq' (format parquet)")

离线验证确认所有值均正确。

相关问题