rust 使用时雄Stream时出现固定问题

uinbv5nw  于 2023-02-23  发布在  其他
关注(0)|答案(2)|浏览(134)

我想在2s过去之前和2s过去之后以不同的方式处理传入的WebSocket消息。
这很棘手,因为我们只有一个read(显然不能被克隆),而且也不喜欢被传递给函数。
我想在处理消息和计时器时使用select!,然后在计时器融合第一个select!之后,在第2阶段再次使用select!,将read的可变借位传递给不同的处理函数。
原来由于固定,我根本无法将read传递给函数。

use std::time::Duration;

use futures_util::{Stream, StreamExt};
use tokio_tungstenite::connect_async; 

async fn wait_2_seconds() {
    tokio::time::sleep(Duration::from_secs(2)).await;
}

async fn process_messages(read: &mut impl Stream) {
    while let Some(m) = read.next().await {
        let data = m.unwrap().into_data();
        println!("{m:?}");
    }
}

#[tokio::main]
async fn main() {
    let url = url::Url::parse("wss://127.0.0.1:12345").unwrap();

    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");

    // don't plan on sending anything to ws server so discard write half
    let (_, read) = ws_stream.split();

    tokio::select!{
        _ = process_messages(&mut read) => {}, 
        _ = wait_2_seconds() => {}, 
    };
    
    println!("phase 1 complete");
}

所以我不确定如何将read传递(mut借用)给函数。
错误消息说考虑使用Box::pin,但我意识到我甚至知道 * 如何 * 在这种情况下使用Box::pin。我尝试将process_messages参数类型更改为Box<Pin<&mut impl Stream>>,并意识到我需要帮助。

ssgvzors

ssgvzors1#

只需将read固定在main()中即可,您可以选择Box::pin() it或更好的tokio::pin!() it(或futures::pin_mut(),甚至是每夜std::pin::pin!()),还需要指定流的Item类型,然后将Pin<&mut impl Stream<Item = ...>>固定在process_messages()中:

use std::pin::Pin;

use tokio_tungstenite::tungstenite::error::Error;
use tokio_tungstenite::tungstenite::protocol::Message;

async fn process_messages(mut read: Pin<&mut impl Stream<Item = Result<Message, Error>>>) {
    while let Some(m) = read.next().await {
        let data = m.unwrap().into_data();
        println!("{data:?}");
    }
}

#[tokio::main]
async fn main() {
    let url = url::Url::parse("wss://127.0.0.1:12345").unwrap();

    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");

    // don't plan on sending anything to ws server so discard write half
    let (_, read) = ws_stream.split();
    // Or `let read = Box::pin(read);`.
    tokio::pin!(read);

    tokio::select! {
        _ = process_messages(read.as_mut()) => {},
        _ = wait_2_seconds() => {},
    };

    println!("phase 1 complete");

    // Process after 2 seconds.
    process_messages(read).await;
}
vnjpjtjt

vnjpjtjt2#

错误消息说考虑使用Box::pin,但后来我意识到我甚至知道如何在这种情况下使用Box::pin。我试图将process_messages参数类型更改为Box<Pin<&mut impl Stream>>,并意识到我需要帮助。
Box::pin返回Pin<Box<_>>,而不是Box<Pin<_>>。还有一些小的调整和Box::pin编译:

/*
[dependencies]
tokio = { version = "*", features = ["full"] }
tokio-tungstenite = "*"
tungstenite = "*"
futures-util = "*"
url = "*"
*/
use std::time::Duration;
use std::pin::Pin;

use futures_util::{Stream, StreamExt};
use tokio_tungstenite::connect_async;

use tungstenite::protocol::Message;
use tungstenite::error::Error;

async fn wait_2_seconds() {
    tokio::time::sleep(Duration::from_secs(2)).await;
}

async fn process_messages(mut read: Pin<Box<impl Stream<Item=Result<Message, Error>>>>) {
    while let Some(m) = read.next().await {
        let data = m.unwrap().into_data();
        println!("{data:?}");
    }
}

#[tokio::main]
async fn main() {
    let url = url::Url::parse("wss://127.0.0.1:12345").unwrap();

    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");

    // don't plan on sending anything to ws server so discard write half
    let (_, read) = ws_stream.split();

    tokio::select!{
        _ = process_messages(Box::pin(read)) => {}, 
        _ = wait_2_seconds() => {}, 
    };
    
    println!("phase 1 complete");
}

Rustexplorer(编译,但在运行时死机,因为你不能绑定到套接字)。

相关问题