rust 克隆双通道

xoefb8l8  于 2023-10-20  发布在  其他
关注(0)|答案(1)|浏览(87)

如何从每个actix-ws回调访问一个send通道?
这是this的一个版本,在一个具体的例子中询问了一个漂亮的MRE。
具体来说,我对actix-ws example server做了尽可能少的修改,以保持它的整洁和简单。
确切的问题是在第一个注解行访问“send”('// use“send”...')

use actix_web::{middleware::Logger, web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_ws::Message;
use futures_util::StreamExt;

async fn ws(req: HttpRequest, body: web::Payload, send: crossbeam::channel::Sender<u32>) -> Result<HttpResponse, Error> {
    // use "send", possible cloned

    let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;

    actix_rt::spawn(async move {
        while let Some(Ok(msg)) = msg_stream.next().await {
            match msg {
                Message::Ping(bytes) => {
                    if session.pong(&bytes).await.is_err() {
                        return;
                    }
                }
                Message::Text(s) => println!("Got text, {}", s),
                _ => break,
            }
        }

        let _ = session.close(None).await;
    });
    
    Ok(response)
}

#[actix_web::main]
async fn main() -> Result<(), anyhow::Error> {
    let (send, recv) = crossbeam::channel::unbounded();

    HttpServer::new(move || {
        App::new()
            .wrap(Logger::default())
            .route("/ws", web::get().to(|req: HttpRequest, body: web::Payload| {
                ws(req, body, send);
            }))
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await?;

    Ok(())
}

错误代码:

error[E0277]: the trait bound `[closure@src/file:35:41: 35:79]: Handler<_>` is not satisfied
   --> src/file:35:41
    |
35  |               .route("/ws", web::get().to(|req: HttpRequest, body: web::Payload| {
    |  ______________________________________--_^
    | |                                      |
    | |                                      required by a bound introduced by this call
36  | |                 ws(req, body, send);
37  | |             }))
    | |_____________^ the trait `Handler<_>` is not implemented for closure `[closure@src/fuck-so.rs:35:41: 35:79]`
    |
note: required by a bound in `Route::to`
sqxo8psd

sqxo8psd1#

这里发生了几个问题:
1.你发布的错误是因为你的闭包没有返回任何东西,并且返回()不是Handler的有效响应。但是,您需要做的就是删除ws(...)之后的;
1.闭包不能引用局部变量send,因为处理程序必须是'static。你可以通过使用move关键字来修复这个问题,这样任何捕获的变量都会被移动到你的闭包中:

web::get().to(move |req: HttpRequest, body:
           // ^^^^
  1. HttpServer::new()的闭包可以被多次调用,因为它受到Fn的约束。在本例中,我们已经将send * 移入 *,但还需要将其 * 移出 *。你可以通过在闭包中执行.clone()-ing来实现这一点(幸运的是,Sender s的克隆成本很低):
HttpServer::new(move || {
    let send = send.clone(); // need to make a new copy to move into the route handler
    App::new(...

1.同样,在路由闭包中,您不能通过将变量传递给ws(...)来移动它,因为它需要多次调用。在其他情况下,你可以只通过引用传递,Sender不需要所有权来做任何事情,但是因为async函数返回Future s来捕获它们的参数,并且函数不允许返回引用它们捕获的值,所以无论如何你都需要.clone()它:

ws(req, body, send.clone())
               // ^^^^^^^^

通过这些更改,您的代码将编译。以下是完整的修复:

use actix_web::{middleware::Logger, web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_ws::Message;
use futures_util::StreamExt;

async fn ws(
    req: HttpRequest,
    body: web::Payload,
    send: crossbeam::channel::Sender<u32>,
) -> Result<HttpResponse, Error> {
    // use "send", possible cloned

    let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;

    actix_rt::spawn(async move {
        while let Some(Ok(msg)) = msg_stream.next().await {
            match msg {
                Message::Ping(bytes) => {
                    if session.pong(&bytes).await.is_err() {
                        return;
                    }
                }
                Message::Text(s) => println!("Got text, {}", s),
                _ => break,
            }
        }

        let _ = session.close(None).await;
    });

    Ok(response)
}

#[actix_web::main]
async fn main() -> Result<(), anyhow::Error> {
    let (send, recv) = crossbeam::channel::unbounded();

    HttpServer::new(move || {
        let send = send.clone();
        App::new().wrap(Logger::default()).route(
            "/ws",
            web::get().to(move |req: HttpRequest, body: web::Payload| ws(req, body, send.clone())),
        )
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await?;

    Ok(())
}

相关问题