rust 如何在没有.clone()的情况下拆分相同的LazyFrame?

wkyowqbh  于 2023-10-20  发布在  其他
关注(0)|答案(2)|浏览(89)

我有一个相当大的.arrow文件(150 Gb),我需要根据一些过滤器分成不同的部分。我开始使用Python Polars实现进行原型设计(JIT编译器在这里真的很有帮助),现在我正试图将我的Python实现移植到Rust中,希望能加快生产过程。我的代码看起来像这样(显然我的代码有更复杂的过滤器,但我在这里简化了问题):

use polars::prelude::*;
use polars_io::ipc::IpcReader;

fn main() -> PolarsResult<()> {
    // opening .arrow file and parse it as LazyFrame
    let file = std::fs::File::open("./myData.arrow").expect("file not found");
    let df = IpcReader::new(file).finish()?.lazy();

    // Split LazyFrame into various parts
    let df1 = df.filter( col("Volume").gt_eq(0).and( col("Volume").lt(1000) ) );
    let df2 = df.filter( col("Volume").gt_eq(1000).and( col("Volume").lt(2000) ) );
    // ... much more filters
    // let df300 = df.filter( col("Volume").gt_eq(n).and( col("Volume").lt(n+1) ) );

    // printing LazyFrames here to simplify the issue, in the real world I would create new .arrow files based on each filters
    println!("{}", df1.collect()?);
    println!("{}", df2.collect()?);
    // ...
    // println!("{}", df300.collect()?);
    Ok(())
}

显然,这段代码无法编译,因为检查器希望我.clone()我的df LazyFrame。下面是在.clone()之后的工作代码:

use polars::prelude::*;
use polars_io::ipc::IpcReader;

fn main() -> PolarsResult<()> {
    // opening .arrow file and parse it as LazyFrame
    let file = std::fs::File::open("./myData.arrow").expect("file not found");
    let df = IpcReader::new(file).finish()?.lazy();

    // Split LazyFrame into various parts
    let df1 = df.clone().filter( col("Volume").gt_eq(0).and( col("Volume").lt(1000) ) );
    let df2 = df.clone().filter( col("Volume").gt_eq(1000).and( col("Volume").lt(2000) ) );
    // ... much more filters
    // let df300 = df.clone().filter( col("Volume").gt_eq(n).and( col("Volume").lt(n+1) ) );

    // printing LazyFrames here to simplify the issue, in the real world I would create new .arrow files based on each filters
    println!("{}", df1.collect()?);
    println!("{}", df2.collect()?);
    // ...
    // println!("{}", df300.collect()?);
    Ok(())
}

现在编译器没有抱怨,所以我可以使用以下命令构建它:

cargo build --release

问题是,与Python中的相同实现相比,我看到至少60倍的性能。所以这让我有点困惑。我知道多个.clone()是导致性能下降的原因,但我最初的理解是Polars会在LazyFrame的上下文中优化这样的.clone(),并避免不必要的数据副本,或者我错了?
我试着用&df过滤df的引用,但搜索引擎检查器也抱怨了。我设法使用引用而不是复制,使用DataFrame而不是LazyFrame,但不知何故,我失去了很多我在Python中使用的方法(比如with_columns在Rust中不适用于DataFrame,或者过滤器需要masks而不是表达式等)。
我错过了一些明显的东西吗?什么是正确的方法来重新使用我原来的df LazyFrame,而不是.clone()-ing它所有的,最好不直接使用DataFrames:)?

wvt8vs2t

wvt8vs2t1#

有很多方法可以优化你的代码,其中一种方法是使用智能指针:您可以使用Rc::clone而不是.clone(),因为它不像.clone()那样进行深层复制(参考:Rc, the Reference Counted Smart Pointer
第二种方法是使用unsafe rust,不建议用于安全应用程序(结帐unsafe keyword docs

hjzp0vay

hjzp0vay2#

问题就像是LazyFrame上的.filter方法通过设计消耗了示例(参见self参数)。
然后在内部,opt_state是一个副本,新的逻辑计划被构建并用于LazyFrame的新示例。
所以有一点开销是的。然而,当我们深入研究LogicalPlan时,我们会看到逻辑计划的一部分被 Package 在Arc中。例如:LogicalPlan::AnonymousScan::function
这意味着并不是逻辑计划的每一位都通过发出.clone()调用来复制。
但是如果您注意到性能下降,您可能希望尝试使用DataFrame而不是LazyFrame。因为它的API不消耗它的示例,所以它通过引用使用self

// do not call `.lazy()`
    let df = IpcReader::new(file).finish()?;

    // Split LazyFrame into various parts
    let df1 = df.filter( col("Volume").gt_eq(0).and( col("Volume").lt(1000) ) );
    let df2 = df.filter( col("Volume").gt_eq(1000).and( col("Volume").lt(2000) ) );

从理论上讲,这段代码的行为应该仍然与克隆LazyFrame的代码非常相似,但开销似乎要少一些。

相关问题