除了一些小项目之外,我对Rust还很陌生。我想处理一些数据,目前正在使用Redis的数据,但之后需要与其他服务交换数据。所以我决定定义
#[async_trait]
pub trait Protocol {
async fn send(&self, value: &str) -> Result<()>;
async fn subscribe(&self) -> Result<Pin<Box<dyn Stream<Item = String>>>>;
}
Redis传输的实现(使用fred 5.2.0)如下:
async fn subscribe(&self) -> Result<Pin<Box<dyn Stream<Item = String>>>> {
let c = self.client.clone();
let key = self.input_key.clone();
let s = stream::unfold((c, key), |(c, key)| async move {
loop {
let value = match c.blpop::<(String, String), _>(&key, 2.0).await {
Ok((_, value)) => value,
_ => { continue; }
};
return Some((value, (c, key)));
}
});
Ok(Box::pin(s))
}
这是可行的,直到我尝试tokio::spawn
一个任务来处理这些消息。
所以我可以这样做:
pub async fn run(&self) {
let protocol = self.protocol.clone();
let s = protocol.subscribe().await.unwrap();
s.for_each(|x| async move {
println!("Received {}", x);
}).await;
}
当我试图将此作为一个任务运行时,我会遇到各种与发送/同步相关的问题,并感到失落。
一个三个三个一个
1条答案
按热度按时间5fjcxozz1#
只需将返回的
Stream
标记为Send
: