在Rust warp中请求前后执行中间件

jrcvhitl  于 2023-05-07  发布在  其他
关注(0)|答案(1)|浏览(160)

我想在warp中跟踪正在运行的连接,这样一个度量计数器在请求被处理之前递增,在请求被处理之后递减。
我试图通过在链的开始使用一个“no-op”过滤器和在链的结尾使用一个自定义日志过滤器来解决这个问题;差不多是这样

/// Increment the request count metric before the requests starts.
fn with_start_call_metrics() -> impl Filter<Extract = (), Error = Infallible> + Clone {
    warp::any()
        .and(path::full())
        .map(|path: FullPath| {
            HttpMetrics::inc_in_flight(path.as_str());
        })
        .untuple_one()
}

/// Decrement the request count metric after the request ended.
fn with_end_call_metrics() -> Log<fn(Info<'_>)> {
    warp::log::custom(|info| {
        HttpMetrics::dec_in_flight(info.path());
        // ... track more metrics, e.g. info.elapsed() ...
    })
}

当一个长时间运行的请求(下面代码中的/slow)启动,并且在请求完全处理之前连接被丢弃时(例如:curl上的CTRL-C)。
在这种情况下,slow路由只是被warp中止,并且永远不会到达下面的with_end_call_metrics过滤器:

#[tokio::main]
async fn main() {
    let hello = warp::path!("hello" / String).and_then(hello);
    let slow = warp::path!("slow").and_then(slow);

    warp::serve(
        with_start_call_metrics()
            .and(
                hello.or(slow), // ... and more ...
            )
            // If the call (e.g. of `slow`) is cancelled, this is never reached.
            .with(with_end_call_metrics()),
    )
    .run(([127, 0, 0, 1], 8080))
    .await;
}

async fn hello(name: String) -> Result<impl warp::Reply, warp::Rejection> {
    Ok(format!("Hello, {}!", name))
}

async fn slow() -> Result<impl warp::Reply, warp::Rejection> {
    tokio::time::sleep(Duration::from_secs(5)).await;
    Ok(format!("That was slow."))
}

我知道这是正常的行为,推荐的方法是依赖于请求中类型的Drop实现,因为它总是被调用,所以类似于:

async fn in_theory<F, T, E>(filter: F) -> Result<T, E>
where
    F: Filter<Extract = T, Error = E>
{
    let guard = TrackingGuard::new();
    filter.await
}

但这不管用。我试着像这样使用wrap_fn

pub fn in_theory<F>(filter: F) -> Result<F::Extract, F::Error>
where
    F: Filter + Clone,
{
    warp::any()
        .and(filter)
        .wrap_fn(|f| async { 
             // ... magic here ...
             f.await 
        })
}

但不管我怎么尝试,它总是以这样的错误结束:

error[E0277]: the trait bound `<F as warp::filter::FilterBase>::Error: reject::sealed::CombineRejection<Infallible>` is not satisfied
   --> src/metrics.rs:255:25
    |
255 |         warp::any().and(filter).wrap_fn(|f| async { f.await })
    |                     --- ^^^^^^ the trait `reject::sealed::CombineRejection<Infallible>` is not implemented for `<F as warp::filter::FilterBase>::Error`
    |                     |
    |                     required by a bound introduced by this call

这是不能指定的,因为reject::sealed不是公共模块。任何帮助是赞赏!

eqqqjvef

eqqqjvef1#

正如评论中所建议的,远离warp并使用Tower来构建中间件是有帮助的。为了直接使用hyper::Server,我不得不重写托管服务器的代码,但这只是一个轻微的不便。
我从一个HttpCallMetrics服务开始,它 Package 了一个内部服务S。由于我正在跟踪HTTP响应,所以我需要该服务最终生成一个hyper::Response,这里用类型参数O表示。
这里的幻影数据是这样的,我可以在结构体上指示O;在这里不添加O将防止Service实现由于缺少特性边界而失败。

#[derive(Clone)]
pub struct HttpCallMetrics<S, O> {
    inner: T,
    _phantom: PhantomData<O>,
}

impl<T, O> HttpCallMetrics<S, O> {
    pub fn new(inner: S) -> Self {
        Self {
            inner,
            _phantom: PhantomData::default(),
        }
    }
}

因为它是关于HTTP指标的,所以该服务还专门处理HTTP请求,因此为任何主体类型B实现了Service<Request<B>>。同样, Package 的服务需要是相同的,其输出需要可转换为Response<O>
HttpCallMetrics服务将生成一个自定义的未来HttpCallMetricsFuture,负责度量跟踪;这是为了避免在这里拳击。除此之外,由于metrics从不阻塞,它将其poll_ready调用转发给 Package 的内部服务。
调用时,将从请求创建一个HttpCallMetricTracker示例。这是一个包含基本请求信息(HTTP方法,版本,路径,开始时间示例)并实现Drop的结构体-当删除时,它将注册请求已终止。无论是否取消或成功完成请求,此操作都有效。

impl<S, B, O> Service<Request<B>> for HttpCallMetrics<S, O>
where
    S: Service<Request<B>>,
    S::Response: Into<hyper::Response<O>>,
{
    type Response = hyper::Response<O>;
    type Error = S::Error;
    type Future = HttpCallMetricsFuture<S::Future, O, Self::Error>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, request: Request<B>) -> Self::Future {
        let tracker = HttpCallMetricTracker::start(&request);
        HttpCallMetricsFuture::new(self.inner.call(request), tracker)
    }
}

实现的future再次需要幻影数据黑客来跟踪服务的future的成功变量O和错误变量E

#[pin_project]
pub struct HttpCallMetricsFuture<F, O, E> {
    #[pin]
    future: F,
    tracker: HttpCallMetricTracker,
    _phantom: PhantomData<(O, E)>,
}

impl<F, O, E> HttpCallMetricsFuture<F, O, E> {
    fn new(future: F, tracker: HttpCallMetricTracker) -> Self {
        Self {
            future,
            tracker,
            _phantom: PhantomData::default(),
        }
    }
}

因此,实现相对简单:本质上,poll调用被转发到 Package 的内部future,如果该future仍然是Poll::Pending,则该方法退出。
当future返回Poll::Ready时,将检查其结果变量,如果是Ok(result),则结果将转换为hyper::Response。然后更新度量并返回响应。
在错误变量的情况下,错误基本上按原样返回。

impl<F, R, O, E> Future for HttpCallMetricsFuture<F, O, E>
where
    F: Future<Output = Result<R, E>>,
    R: Into<hyper::Response<O>>,
{
    type Output = Result<hyper::Response<O>, E>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        let response = match this.future.poll(cx) {
            Poll::Pending => return Poll::Pending,
            Poll::Ready(reply) => reply,
        };

        let result = match response {
            Ok(reply) => {
                let response = reply.into();
                this.tracker
                    .set_state(ResultState::Result(response.status(), response.version()));
                Ok(response)
            }
            Err(e) => {
                this.tracker.set_state(ResultState::Failed);
                Err(e)
            }
        };
        Poll::Ready(result)
    }
}

HttpCallMetricTracker或多或少是微不足道的,它在构造时递增调用度量,在丢弃时递减调用度量。
这里唯一有趣的一点是state: Cell<ResultState>字段。这允许Drop实现推断是否应该记录某些内容。这里并没有严格要求

struct HttpCallMetricTracker {
    version: Version,
    method: hyper::Method,
    path: String,
    start: Instant,
    state: Cell<ResultState>,
}

pub enum ResultState {
    /// The result was already processed.
    None,
    /// Request was started.
    Started,
    /// The result failed with an error.
    Failed,
    /// The result is an actual HTTP response.
    Result(StatusCode, Version),
}

impl HttpCallMetricTracker {
    fn start<B>(request: &Request<B>) -> Self {
        // increase "requests in flight" metric
        Self {
            // ...
            state: Cell::new(ResultState::None),
        }
    }

    fn set_state(&self, state: ResultState) {
        self.state.set(state)
    }

    fn duration(&self) -> Duration {
        Instant::now() - self.start
    }
}

impl Drop for HttpCallMetricTracker {
    fn drop(&mut self) {
        match self.state.replace(ResultState::None) {
            ResultState::None => {
                // This was already handled; don't decrement metrics again.
                return;
            }
            ResultState::Started => {
                // no request was actually performed.
            }
            ResultState::Failed => {
                // handle "fail" state
            }
            ResultState::Result(status, version) => {
                // handle "meaningful result" state
            }
        }

        // decrease "requests in flight" metric
    }
}

就托管而言,代码现在看起来像这样:

let make_svc = make_service_fn(|_conn| {
    let tx = shutdown_tx.clone();

    async move {
        // Convert the warp filter into a Tower service.
        let svc = warp::service(
            hello
                .or(slow)
                .or(filters::metrics_endpoint())
                .or(filters::health_endpoints())
                .or(filters::shutdown_endpoint(tx)),
        );

        // Wrap it into the metrics service.
        let svc = services::HttpCallMetrics::new(svc);

        Ok::<_, Infallible>(svc)
    }
});

let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
let listener = TcpListener::bind(addr).unwrap();

// Using a ServiceBuilder is not strictly required.
let builder = ServiceBuilder::new().service(make_svc);

Server::from_tcp(listener)
    .unwrap()
    .serve(builder)
    .with_graceful_shutdown(async move {
        shutdown_rx.recv().await.ok();
    })
    .await?;

也就是说,也存在tower_http::trace,它似乎确实支持上述所有内容。我可能会迁移到以后,但这个练习帮助我极大地了解塔摆在首位。

相关问题