websocket futures_util::流::可查看未查看

k3bvogb1  于 2023-03-02  发布在  其他
关注(0)|答案(1)|浏览(120)

由于熟悉Iterator中的peekable()Peek,我以为我知道futures版本的行为。
但令我惊讶的是,它不是在偷看--它实际上是从Stream中取出项目,这样当我调用next()时,它们就不可用了。
当我在下面的代码中多次等待peek_first_message()时,它会显示 * 不同的 * WebSocket消息。

#[derive(Debug, Deserialize)]
struct WsMessage {
   ... // elided
}

async fn peek_first_message(
    mut read: Pin<&mut impl Stream<Item = Result<Message, Error>>>,
) -> anyhow::Result<()> {
    let read = read.peekable();
    tokio::pin!(read);
    let x = read
        .as_mut()
        .peek()
        .await
        .ok_or(anyhow::anyhow!("websocket closed before first message"))?
        .as_ref();
    let ws: WsMessage = match x {
        Ok(v) => { serde_json::from_slice(v.to_text()?.as_bytes())? },
        Err(e) => { return Err(anyhow::anyhow!("fail")) },
    };
    println!("{ws:?}");
    return Ok(())
}

#[tokio::main]
async fn main() {
    let url = url::Url::parse("wss://127.0.0.1:12345").unwrap();
    let (ws_stream, _) = tokio_tungstenite::connect_async(url).await.expect("Failed to connect");
    let (_, read) = ws_stream.split();
    tokio::pin!(read);

    peek_first_message(read.as_mut()).await;
    peek_first_message(read.as_mut()).await;
    peek_first_message(read.as_mut()).await;
}

我在peek_first_message中只调用了peek,但是当我检查print语句时,我发现process_messages函数没有打印出被偷看的消息,这就好像我刚刚在peeking函数中调用了next(),这是怎么回事?

8yparm6h

8yparm6h1#

这正是预期的行为,因为你在函数中创建了一个新的Peekable,它必须通过每次调用底层迭代器的next来填充它的缓冲区。如果缓冲区为空,迭代器上的Peekable也会调用next
如果您想保留缓冲区,则必须传递Peekable,因为它包含缓冲区,而不是底层读取器。
Sven Marnach为常规IteratorPeekable的类似问题提供了出色的answer

相关问题