由于熟悉Iterator
中的peekable()
和Peek
,我以为我知道futures
版本的行为。
但令我惊讶的是,它不是在偷看--它实际上是从Stream中取出项目,这样当我调用next()
时,它们就不可用了。
当我在下面的代码中多次等待peek_first_message()
时,它会显示 * 不同的 * WebSocket消息。
#[derive(Debug, Deserialize)]
struct WsMessage {
... // elided
}
async fn peek_first_message(
mut read: Pin<&mut impl Stream<Item = Result<Message, Error>>>,
) -> anyhow::Result<()> {
let read = read.peekable();
tokio::pin!(read);
let x = read
.as_mut()
.peek()
.await
.ok_or(anyhow::anyhow!("websocket closed before first message"))?
.as_ref();
let ws: WsMessage = match x {
Ok(v) => { serde_json::from_slice(v.to_text()?.as_bytes())? },
Err(e) => { return Err(anyhow::anyhow!("fail")) },
};
println!("{ws:?}");
return Ok(())
}
#[tokio::main]
async fn main() {
let url = url::Url::parse("wss://127.0.0.1:12345").unwrap();
let (ws_stream, _) = tokio_tungstenite::connect_async(url).await.expect("Failed to connect");
let (_, read) = ws_stream.split();
tokio::pin!(read);
peek_first_message(read.as_mut()).await;
peek_first_message(read.as_mut()).await;
peek_first_message(read.as_mut()).await;
}
我在peek_first_message
中只调用了peek
,但是当我检查print语句时,我发现process_messages
函数没有打印出被偷看的消息,这就好像我刚刚在peeking函数中调用了next()
,这是怎么回事?
1条答案
按热度按时间8yparm6h1#
这正是预期的行为,因为你在函数中创建了一个新的
Peekable
,它必须通过每次调用底层迭代器的next
来填充它的缓冲区。如果缓冲区为空,迭代器上的Peekable
也会调用next
。如果您想保留缓冲区,则必须传递
Peekable
,因为它包含缓冲区,而不是底层读取器。Sven Marnach为常规
Iterator
和Peekable
的类似问题提供了出色的answer