我试图建立一个多线程应用程序使用MPSC和我运行到标题中的错误。我不确定这个用例的正确模式是什么--我正在寻找一种模式,它允许我克隆生产者通道并将其移动到一个新的线程中使用。
这个新线程将保持一个打开的WebSocket,并在收到WebSocket消息时通过生产者发送WebSocket消息数据的子集。消费者线程中需要来自其他线程的数据,这就是为什么我认为MPSC模式是一个合适的选择。
除了标题中的信息,它还显示了这些:
`std::sync::mpsc::Sender<i32>` cannot be shared between threads safely
help: the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender`
我可以/应该为此实现Send
吗?现在使用Rc
或Pin
合适吗?我相信这是因为我试图在async
闭包中的.await
上发送一个没有实现Send
的类型,但我不知道该怎么做,也不知道在这种情况下该怎么做。
我可以把我的问题归结为:
use futures::stream::{self, StreamExt};
use std::sync::mpsc::{channel, Receiver, Sender};
#[tokio::main]
async fn main() {
let (tx, rx): (Sender<i32>, Receiver<i32>) = channel();
tokio::spawn(async move {
let a = [1, 2, 3];
let mut s = stream::iter(a.iter())
.cycle()
.for_each(move |int| async {
tx.send(*int);
})
.await;
});
}
1条答案
按热度按时间hwamh0ep1#
你的代码有几个问题。第一个是在最里面的
async
块中缺少了一个move
,所以编译器试图借用对tx
的引用。这就是为什么你会得到Sender
(tx
类型)不实现Sync
的错误。添加缺少的
move
后,您会得到一个不同的错误:现在的问题是,
for_each()
将多次调用闭包,所以实际上不允许将tx
移动到tx
块中-因为在第一次调用闭包之后没有什么可移动的。由于MPSC通道允许多个生产者,因此
Sender
实现了Clone
,因此您可以在将tx
移动到PMAC块之前简单地克隆它。这编译:Playground
最后,正如评论中所指出的,您几乎肯定希望在这里使用BRAC通道。虽然你创建的通道是无边界的,所以线程不会阻塞,但接收者 * 会 * 在没有消息时阻塞,因此会暂停整个执行器线程。
碰巧的是,时雄MPSC通道的发送端也实现了
Sync
,允许编译与您问题中的代码接近的代码:Playground