rust 发送并同步future::stream::unfold的问题

but5z9lq  于 2023-03-12  发布在  其他
关注(0)|答案(1)|浏览(157)

除了一些小项目之外,我对Rust还很陌生。我想处理一些数据,目前正在使用Redis的数据,但之后需要与其他服务交换数据。所以我决定定义

#[async_trait]
pub trait Protocol {
    async fn send(&self, value: &str) -> Result<()>;
    async fn subscribe(&self) -> Result<Pin<Box<dyn Stream<Item = String>>>>;
}

Redis传输的实现(使用fred 5.2.0)如下:

async fn subscribe(&self) -> Result<Pin<Box<dyn Stream<Item = String>>>> {
    let c = self.client.clone();
    let key = self.input_key.clone();
    let s = stream::unfold((c, key), |(c, key)| async move {
        loop {
            let value = match c.blpop::<(String, String), _>(&key, 2.0).await {
                Ok((_, value)) => value,
                _ => { continue; }
            };

            return Some((value, (c, key)));
        }
    });
    Ok(Box::pin(s))
}

这是可行的,直到我尝试tokio::spawn一个任务来处理这些消息。
所以我可以这样做:

pub async fn run(&self) {
    let protocol = self.protocol.clone();
    let s = protocol.subscribe().await.unwrap();
    s.for_each(|x| async move {
        println!("Received {}", x);
    }).await;
}

当我试图将此作为一个任务运行时,我会遇到各种与发送/同步相关的问题,并感到失落。
一个三个三个一个

5fjcxozz

5fjcxozz1#

只需将返回的Stream标记为Send

async fn subscribe(&self) -> Result<Pin<Box<dyn Stream<Item = String> + Send>>>;

相关问题