我有一个使用tonic实现的gRPC服务,它返回一个值流。这个流是在时雄任务中创建的,并使用tokio mpsc通道发送给客户端。
问题是,正在发送部分结果的派生任务在客户端断开连接后没有中止,并且接收器被丢弃,导致在发送到通道时出错。
简化代码:
#[tonic::async_trait]
impl ServiceTrait for MyService {
type MyStream = BoxStream<'static, Result<MyResponse, tonic::Status>>;
async fn get_stream(
&self,
_request: tonic::Request<()>,
) -> Result<tonic::Response<Self::MyStream>, tonic::Status> {
let (tx, rx) = mpsc::channel::<Result<MyResponse, tonic::Status>>(1);
// I need this task to be aborted when rx is dropped
let producer_task_handle = tokio::spawn({
// spawn many parallel tasks with ratelimiting
...
// each task sends its result to tx
tx.send(response).await.unwrap() // panics when rx is dropped after client disconnects
});
Ok(tonic::Response::new(ReceiverStream::new(rx).boxed()))
}
}
当通道关闭时,我如何中止生产者任务?或者有更好的方法来做到这一点?我有一个返回流的工作版本,但这不再是一个选项。
1条答案
按热度按时间ecfsfe2w1#
经过一番考虑,我决定将
CancellationToken
与DropGuard
一起使用将接收器流 Package 在嵌入了
DropGuard
的结构中,可确保一旦流被丢弃,取消令牌将被取消,任务将被中止使用方法: