rust 错误:捕获的变量无法转义“FnMut”闭包体

yrwegjxp  于 2023-01-05  发布在  其他
关注(0)|答案(1)|浏览(161)

以下代码尝试在通过连接获得msg后异步更新主 Dataframe df(来自polars包)。
我看过关于堆栈溢出的“重复”帖子,但仍然不明白我做错了什么。我只是想可变地借用 Dataframe 并更新它,仅此而已!我用字符串尝试了一下,它工作得很好...

pub async fn new_handler(endpoint: &str) -> tokio::task::JoinHandle<()> {
    // Make master df for this handler
    let mut df = DataFrame::empty().lazy();
    // Make a stream for this handler
    let stream = new_stream(endpoint).await;
    let handle = tokio::spawn(async move {
        // let handle = tokio::spawn(async {
        stream
            .for_each(|msg| async move {
                match msg {
                    Ok(msg) => {
                        // Parse the json message into a struct
                        let jsonmsg: AggTrade =
                            serde_json::from_str(&msg.to_string()).expect("Failed to parse json");
                        let s0 = Series::new(
                            "price",
                            vec![jsonmsg.price.parse::<f32>().expect("Failed to parse price")],
                        );
                        let s1 = Series::new(
                            "quantity",
                            vec![jsonmsg
                                .quantity
                                .parse::<f32>()
                                .expect("Failed to parse quantity")],
                        );
                        // Create new dataframe from the json data
                        let df2 = DataFrame::new(vec![s0.clone(), s1.clone()]).unwrap().lazy();
                        // append the new data from df2 to the master df
                        df = polars::prelude::concat([df, df2], false, true)
                            .expect("Failed to concat");
                    }
                    Err(e) => {
                        println!("Error: {}", e);
                    }
                }
            })
            .await
    });
    handle
}

出现以下错误:

error: captured variable cannot escape `FnMut` closure body
  --> src/websockets.rs:33:29
   |
27 |       let mut df = DataFrame::empty().lazy();
   |           ------ variable defined here
...
33 |               .for_each(|msg| async {
   |  ___________________________-_^
   | |                           |
   | |                           inferred to be a `FnMut` closure
34 | |                 match msg {
35 | |                     Ok(msg) => {
36 | |                         // Parse the json message into a struct
...  |
58 | |                         df = polars::prelude::concat([df.clone(), df2.clone()], false, true)
   | |                                                       -- variable captured here
...  |
86 | |                 }
87 | |             })
   | |_____________^ returns an `async` block that contains a reference to a captured variable, which then escapes the closure body
   |
   = note: `FnMut` closures only have access to their captured variables while they are executing...
   = note: ...therefore, they cannot allow references to captured variables to escape
xfb7svmp

xfb7svmp1#

问题是传递给stream.for_each()的闭包可以被多次调用,但是df变量在被df.clone()调用引用时被移到闭包中。
下面是一个自包含的最小代码示例,显示了相同的编译错误。如果取消注解函数中的最后几行,它将无法编译:

async fn fails_moved_into_closure_called_multiple_times() {
    println!("fails_moved_into_closure_called_multiple_times():");
    let mut df = vec![];

    let closure = || async move {
        let new_value = df.len();
        println!("in the closure, pushing {}", new_value);
        df.push(new_value);
    };

    let future = closure();
    future.await;

    let future2 = closure();  // FAIL
    future2.await;

    println!("final value: {:?}", df);  // FAIL
}

事实上,Rust不能确定for_each函数是否会在多个线程中并发地多次调用闭包,下面是一个使用Arc<Mutex<T>>的解决方案,它是线程安全的,并且修复了所有权问题:

async fn fix_using_arc() {
    println!("fix_using_arc():");
    let df = Arc::new(Mutex::new(vec![]));

    let closure = || async {
        let my_df = Arc::clone(&df);
        let mut shared = my_df.lock().unwrap();
        let new_value = shared.len();
        println!("in the closure, pushing {}", new_value);
        shared.push(new_value);
    };

    let future = closure();
    future.await;

    let future2 = closure();
    future2.await;

    println!("final value: {:?}", df);
}

相关问题