rust trait `std::marker::Sync`未为`std::sync::mpsc::blog实现< i32>

8fq7wneg  于 2023-10-20  发布在  其他
关注(0)|答案(1)|浏览(140)

我试图建立一个多线程应用程序使用MPSC和我运行到标题中的错误。我不确定这个用例的正确模式是什么--我正在寻找一种模式,它允许我克隆生产者通道并将其移动到一个新的线程中使用。
这个新线程将保持一个打开的WebSocket,并在收到WebSocket消息时通过生产者发送WebSocket消息数据的子集。消费者线程中需要来自其他线程的数据,这就是为什么我认为MPSC模式是一个合适的选择。
除了标题中的信息,它还显示了这些:

`std::sync::mpsc::Sender<i32>` cannot be shared between threads safely
help: the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender`

我可以/应该为此实现Send吗?现在使用RcPin合适吗?我相信这是因为我试图在async闭包中的.await上发送一个没有实现Send的类型,但我不知道该怎么做,也不知道在这种情况下该怎么做。
我可以把我的问题归结为:

use futures::stream::{self, StreamExt};
use std::sync::mpsc::{channel, Receiver, Sender};

#[tokio::main]
async fn main() {
    let (tx, rx): (Sender<i32>, Receiver<i32>) = channel();

    tokio::spawn(async move {
        let a = [1, 2, 3];
        let mut s = stream::iter(a.iter())
            .cycle()
            .for_each(move |int| async {
                tx.send(*int);
            })
            .await;
    });
}
hwamh0ep

hwamh0ep1#

你的代码有几个问题。第一个是在最里面的async块中缺少了一个move,所以编译器试图借用对tx的引用。这就是为什么你会得到Sendertx类型)不实现Sync的错误。
添加缺少的move后,您会得到一个不同的错误:

error[E0507]: cannot move out of `tx`, a captured variable in an `FnMut` closure

现在的问题是,for_each()将多次调用闭包,所以实际上不允许将tx移动到tx块中-因为在第一次调用闭包之后没有什么可移动的。
由于MPSC通道允许多个生产者,因此Sender实现了Clone,因此您可以在将tx移动到PMAC块之前简单地克隆它。这编译:

let (tx, _rx): (Sender<i32>, Receiver<i32>) = channel();

tokio::spawn(async move {
    let a = [1, 2, 3];
    let _s = stream::iter(a.iter())
        .cycle()
        .for_each(move |int| {
            let tx = tx.clone();
            async move {
                tx.send(*int).unwrap();
            }
        })
        .await;
});

Playground
最后,正如评论中所指出的,您几乎肯定希望在这里使用BRAC通道。虽然你创建的通道是无边界的,所以线程不会阻塞,但接收者 * 会 * 在没有消息时阻塞,因此会暂停整个执行器线程。
碰巧的是,时雄MPSC通道的发送端也实现了Sync,允许编译与您问题中的代码接近的代码:

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

tokio::spawn(async move {
    let a = [1, 2, 3];
    let _s = stream::iter(a.iter())
        .cycle()
        .for_each(|int| async {
            tx.send(*int).unwrap();
        })
        .await;
});

assert_eq!(rx.recv().await, Some(1));
assert_eq!(rx.recv().await, Some(2));
assert_eq!(rx.recv().await, Some(3));

Playground

相关问题