该项目正在实现一个TcpListener服务。它监视pc事件并向TCP客户端广播。作为请求(一个生产者和多个消费者)。但代码引发了“future cannot be sent between threads safely”错误:
use tokio;
use tokio::{io::AsyncWriteExt, net::TcpStream, sync::watch};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (tx, mut rx) = watch::channel::<Vec<u8>>(vec![]);
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
tokio::spawn(async move {
while rx.changed().await.is_ok() {
stream.write(&*rx.borrow()).await;
}
});
tx.send(vec![1u8, 2u8])?;
Ok(())
}
playground
给出:
error: future cannot be sent between threads safely
--> src/main.rs:9:18
|
9 | tokio::spawn(async move {
| __________________^
10 | | while rx.changed().await.is_ok() {
11 | | stream.write(&*rx.borrow()).await;
12 | | }
13 | | });
| |_____^ future created by async block is not `Send`
|
= help: within `impl Future<Output = ()>`, the trait `Send` is not implemented for `std::sync::RwLockReadGuard<'_, Vec<u8>>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:11:40
|
11 | stream.write(&*rx.borrow()).await;
| ----------- ^^^^^^ await occurs here, with `rx.borrow()` maybe used later
| |
| has type `tokio::sync::watch::Ref<'_, Vec<u8>>` which is not `Send`
note: `rx.borrow()` is later dropped here
--> src/main.rs:11:46
|
11 | stream.write(&*rx.borrow()).await;
| ^
help: consider moving this into a `let` binding to create a shorter lived borrow
--> src/main.rs:11:27
|
11 | stream.write(&*rx.borrow()).await;
| ^^^^^^^^^^^^
note: required by a bound in `tokio::spawn`
--> /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.22.0/src/task/spawn.rs:163:21
|
163 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
当我将Vec<u8>
更改为u8
&str
时,它可以工作。Vec、String和其他复杂类型将引发异常。
我在谷歌上搜索了一下,但还是很困惑。谢谢你的帮助。
1条答案
按热度按时间yvfmudvl1#
我认为因为
Ref<_,T>
本身并没有实现Send
,所以你需要把这个类型转换成Send
。我认为唯一明智的方法是在写它之前先克隆它。看起来很有效。此外,您需要使用
borrow_and_update
将该值标记为“seen”,以确保不会重复向stream
写入相同的值。