rust 正在使用另一个任务唤醒器轮询未来

txu3uszq  于 2022-11-12  发布在  其他
关注(0)|答案(2)|浏览(181)

我正在尝试实现一个接收器 Package 器,它将刷新委托给另一个任务。基本上:

  • Package 器在构造时生成服务任务
  • 循环中的服务任务在基础流上运行flush(),如果poll_flush()返回Ready,则暂停
  • 当客户端将数据发送到 Package 的接收器中时,它将数据feed()发送到底层接收器,然后调用inner.poll_flush(),该inner.poll_flush()具有用于服务任务的上下文保持唤醒器。

想法应该是非常清楚的--客户端不需要费心冲洗Flume,但只要底层套接字/任何东西准备好,Flume就会自动冲洗。
(to避免处理多线程问题(假设所有这些都发生在同一个LocalSet中)
我不知道如何为服务任务构建前面提到的Waker--在时雄中是否可能?如果不可能--是否可能要求运行时使用它的句柄唤醒另一个任务?

pgpifvop

pgpifvop1#

我想不出如何为服务任务构建上述Waker
当您不是执行者(* 拥有要轮询的任务 * 的“顶级”非异步代码)时,您永远不会 * 构造 * Waker-您只使用已经存在的Waker
在此方案中,服务任务必须await您提供的某个future,并且在轮询该future时,其poll()方法将被赋予Waker。该唤醒程序是您需要唤醒以恢复服务任务的唤醒程序。如果尚未轮询该服务任务,则您将没有唤醒程序。但您也不需要它(因为任务 * 将 * 被轮询)。
在某些情况下,这个问题可以通过使用异步通道(比如tokio::sync模块中的通道)来简化,而不是为服务任务获取唤醒器。我不确定您的情况是否属于这些情况之一--可能不是,因为您实际上希望将唤醒器传递给poll_flush(),但也许有一种方法可以重新设计它,使其成为这种情况。

pcww981p

pcww981p2#

我想通了:


# [derive(Default)]

struct LocalNotify {
    is_on: bool,
    waker: Option<Waker>,
}

impl LocalNotify {
    fn delegate_poll<T, F: FnOnce(&mut Context<'_>) -> Poll<T>>(
        &mut self,
        f: F,
    ) -> Option<T> {
        let w = self.waker.as_ref().unwrap_or_else(|| noop_waker_ref());
        match f(&mut Context::from_waker(w)) {
            Poll::Pending => {
                self.is_on = true;
                None
            },
            Poll::Ready(r) => Some(r),
        }
    }

    fn notify(&mut self) {
        if !self.is_on {
            self.is_on = true;
            self.waker.take().map(|w| w.wake());
        }
    }

    fn poll_notified(&mut self, cx: &mut Context<'_>) -> Poll<()> {
        if self.is_on {
            self.is_on = false;
            self.waker = None;
            return Poll::Ready(());
        } else {
            if self.waker.as_ref().map_or(true, |w| !w.will_wake(cx.waker())) {
                self.waker = Some(cx.waker().clone());
            }
            return Poll::Pending;
        }
    }

    async fn notified(&mut self) {
        poll_fn(|cx| self.poll_notified(cx)).await
    }
}

因此,如果要将poll_flush唤醒委派给另一个任务(即等待LocalNotify的任务),您可以:

fn poll_send(&mut self, cx: &mut Context<'_>, item: &String) -> Poll<Result<()>> {
    ...
    ready!(<MyStream as SinkExt<String>>::poll_ready_unpin(inner, cx))?;

    let initiate_flush = inner.write_buffer().is_empty();

    inner.start_send_unpin(item.clone())?;

    if initiate_flush {
        ready!(self.svc_notify.delegate_poll(|cx| <MyStream as SinkExt<String>>::poll_flush_unpin(inner, cx)))?;
    // ... or
    // self.svc_notify.notify();
    ...

(当然,该任务应该在select!中循环运行inner.flush()。您必须使用poll_fn()RefCell,以便在相关的poll调用期间暂时借用&mut

相关问题