rust 当通道被丢弃时中止时雄任务

xwbd5t1u  于 2023-04-06  发布在  其他
关注(0)|答案(1)|浏览(181)

我有一个使用tonic实现的gRPC服务,它返回一个值流。这个流是在时雄任务中创建的,并使用tokio mpsc通道发送给客户端。
问题是,正在发送部分结果的派生任务在客户端断开连接后没有中止,并且接收器被丢弃,导致在发送到通道时出错。
简化代码:

#[tonic::async_trait]
impl ServiceTrait for MyService {
    type MyStream = BoxStream<'static, Result<MyResponse, tonic::Status>>;

    async fn get_stream(
        &self,
        _request: tonic::Request<()>,
    ) -> Result<tonic::Response<Self::MyStream>, tonic::Status> {
        let (tx, rx) = mpsc::channel::<Result<MyResponse, tonic::Status>>(1);

        // I need this task to be aborted when rx is dropped
        let producer_task_handle = tokio::spawn({
                // spawn many parallel tasks with ratelimiting
                ...
                // each task sends its result to tx
                tx.send(response).await.unwrap() // panics when rx is dropped after client disconnects
        });

        Ok(tonic::Response::new(ReceiverStream::new(rx).boxed()))
    }
}

当通道关闭时,我如何中止生产者任务?或者有更好的方法来做到这一点?我有一个返回流的工作版本,但这不再是一个选项。

ecfsfe2w

ecfsfe2w1#

经过一番考虑,我决定将CancellationTokenDropGuard一起使用
将接收器流 Package 在嵌入了DropGuard的结构中,可确保一旦流被丢弃,取消令牌将被取消,任务将被中止

#[derive(Debug)]
#[pin_project]
pub struct StreamWithData<T: Stream, D> {
    #[pin]
    inner: T,
    data: D,
}

impl<T: Stream, D> StreamWithData<T, D> {
    pub fn new(inner: T, data: D) -> Self {
        Self { inner, data }
    }
}

impl<T: Stream, D> Stream for StreamWithData<T, D> {
    type Item = T::Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();
        this.inner.poll_next(cx)
    }
}

pub trait DataStream: Stream + Sized {
    fn attach_data<D>(self, data: D) -> StreamWithData<Self, D>
    where
        Self: Sized,
    {
        StreamWithData::new(self, data)
    }
}

impl<T: Stream> DataStream for T {}

使用方法:

let cancellation_token = CancellationToken::new();
        let drop_guard = cancellation_token.clone().drop_guard();
        // output stream guards the rx drop and cancels the root token
        let rx = ReceiverStream::new(rx).attach_data(drop_guard);


        let producer_task_handle = task::spawn({
                // do stuff
        })

        // abort task
        let abort_handle = producer_task_handle.abort_handle();
        task::spawn(async move {
            cancellation_token.cancelled().await;
            abort_handle.abort();
        });

相关问题