在我的代码中我创建了
let shared_vec: Arc<Mutex<Vec<user::User>>> = Arc::new(Mutex::new(Vec::new()));
类型为use tokio::sync::Mutex;
。
我试图获取锁并将值添加到tokio::spawn
内部的列表中,这应该是异步工作的,但我得到了一个错误
value moved here, in previous iteration of loop
here
位于tokio::spawn(async move {...
的末尾
有人能告诉我解决这个问题的方法吗?这是我的代码
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::TcpListener,
sync::broadcast,
};
use serde_json::json;
use serde_json::Value;
use tokio::sync::Mutex;
use std::sync::Arc;
mod user;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("localhost:8080").await.unwrap();
let (tx, _rx) = broadcast::channel(10);
let shared_vec: Arc<Mutex<Vec<user::User>>> = Arc::new(Mutex::new(Vec::new()));
loop {
let (mut socket, addr) = listener.accept().await.unwrap();
let tx = tx.clone();
let mut rx = tx.subscribe();
tokio::spawn(async move {
let (reader, mut writer) = socket.split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
loop {
tokio::select! {
result = reader.read_line(&mut line) => {
if result.unwrap() == 0 {
break;
}
tx.send((line.clone(), addr)).unwrap();
line.clear();
}
result = rx.recv() => {
let (msg, other_addr) = result.unwrap();
let json_msg: Value = json!(msg);
let mut vec = shared_vec.lock().await;
let mut indices_to_update = Vec::new();
for (index, usr) in vec.iter().enumerate() {
if usr.get_ip() != &addr {
indices_to_update.push(index);
}
}
for _index in indices_to_update {
let user = user::User::new(json_msg["username"].to_string(), addr);
vec.push(user);
}
if addr == other_addr {
writer.write_all(msg.as_bytes()).await.unwrap();
}
}
}
}
});
}
}
1条答案
按热度按时间oxf4rvwz1#
您必须在循环块中使用
Arc::clone(&shared_vec)
将clone
传递给async move block
。然后可以将新克隆传递给move block
。如果没有克隆,共享引用将被传递给其他线程/块,并且不允许再使用。