我想在warp中跟踪正在运行的连接,这样一个度量计数器在请求被处理之前递增,在请求被处理之后递减。
我试图通过在链的开始使用一个“no-op”过滤器和在链的结尾使用一个自定义日志过滤器来解决这个问题;差不多是这样
/// Increment the request count metric before the requests starts.
fn with_start_call_metrics() -> impl Filter<Extract = (), Error = Infallible> + Clone {
warp::any()
.and(path::full())
.map(|path: FullPath| {
HttpMetrics::inc_in_flight(path.as_str());
})
.untuple_one()
}
/// Decrement the request count metric after the request ended.
fn with_end_call_metrics() -> Log<fn(Info<'_>)> {
warp::log::custom(|info| {
HttpMetrics::dec_in_flight(info.path());
// ... track more metrics, e.g. info.elapsed() ...
})
}
当一个长时间运行的请求(下面代码中的/slow
)启动,并且在请求完全处理之前连接被丢弃时(例如:curl
上的CTRL-C
)。
在这种情况下,slow
路由只是被warp中止,并且永远不会到达下面的with_end_call_metrics
过滤器:
#[tokio::main]
async fn main() {
let hello = warp::path!("hello" / String).and_then(hello);
let slow = warp::path!("slow").and_then(slow);
warp::serve(
with_start_call_metrics()
.and(
hello.or(slow), // ... and more ...
)
// If the call (e.g. of `slow`) is cancelled, this is never reached.
.with(with_end_call_metrics()),
)
.run(([127, 0, 0, 1], 8080))
.await;
}
async fn hello(name: String) -> Result<impl warp::Reply, warp::Rejection> {
Ok(format!("Hello, {}!", name))
}
async fn slow() -> Result<impl warp::Reply, warp::Rejection> {
tokio::time::sleep(Duration::from_secs(5)).await;
Ok(format!("That was slow."))
}
我知道这是正常的行为,推荐的方法是依赖于请求中类型的Drop
实现,因为它总是被调用,所以类似于:
async fn in_theory<F, T, E>(filter: F) -> Result<T, E>
where
F: Filter<Extract = T, Error = E>
{
let guard = TrackingGuard::new();
filter.await
}
但这不管用。我试着像这样使用wrap_fn
:
pub fn in_theory<F>(filter: F) -> Result<F::Extract, F::Error>
where
F: Filter + Clone,
{
warp::any()
.and(filter)
.wrap_fn(|f| async {
// ... magic here ...
f.await
})
}
但不管我怎么尝试,它总是以这样的错误结束:
error[E0277]: the trait bound `<F as warp::filter::FilterBase>::Error: reject::sealed::CombineRejection<Infallible>` is not satisfied
--> src/metrics.rs:255:25
|
255 | warp::any().and(filter).wrap_fn(|f| async { f.await })
| --- ^^^^^^ the trait `reject::sealed::CombineRejection<Infallible>` is not implemented for `<F as warp::filter::FilterBase>::Error`
| |
| required by a bound introduced by this call
这是不能指定的,因为reject::sealed
不是公共模块。任何帮助是赞赏!
1条答案
按热度按时间eqqqjvef1#
正如评论中所建议的,远离warp并使用Tower来构建中间件是有帮助的。为了直接使用
hyper::Server
,我不得不重写托管服务器的代码,但这只是一个轻微的不便。我从一个
HttpCallMetrics
服务开始,它 Package 了一个内部服务S
。由于我正在跟踪HTTP响应,所以我需要该服务最终生成一个hyper::Response
,这里用类型参数O
表示。这里的幻影数据是这样的,我可以在结构体上指示
O
;在这里不添加O
将防止Service
实现由于缺少特性边界而失败。因为它是关于HTTP指标的,所以该服务还专门处理HTTP请求,因此为任何主体类型B实现了
Service<Request<B>>
。同样, Package 的服务需要是相同的,其输出需要可转换为Response<O>
。HttpCallMetrics
服务将生成一个自定义的未来HttpCallMetricsFuture
,负责度量跟踪;这是为了避免在这里拳击。除此之外,由于metrics从不阻塞,它将其poll_ready
调用转发给 Package 的内部服务。调用时,将从请求创建一个
HttpCallMetricTracker
示例。这是一个包含基本请求信息(HTTP方法,版本,路径,开始时间示例)并实现Drop
的结构体-当删除时,它将注册请求已终止。无论是否取消或成功完成请求,此操作都有效。实现的future再次需要幻影数据黑客来跟踪服务的future的成功变量
O
和错误变量E
。因此,实现相对简单:本质上,
poll
调用被转发到 Package 的内部future,如果该future仍然是Poll::Pending
,则该方法退出。当future返回
Poll::Ready
时,将检查其结果变量,如果是Ok(result)
,则结果将转换为hyper::Response
。然后更新度量并返回响应。在错误变量的情况下,错误基本上按原样返回。
HttpCallMetricTracker
或多或少是微不足道的,它在构造时递增调用度量,在丢弃时递减调用度量。这里唯一有趣的一点是
state: Cell<ResultState>
字段。这允许Drop
实现推断是否应该记录某些内容。这里并没有严格要求就托管而言,代码现在看起来像这样:
也就是说,也存在tower_http::trace,它似乎确实支持上述所有内容。我可能会迁移到以后,但这个练习帮助我极大地了解塔摆在首位。