rust 使用hyper将传入连接的IP地址传递到服务堆栈

pdkcd3nj  于 2022-11-12  发布在  其他
关注(0)|答案(2)|浏览(185)

我正在尝试使用hyper编写一个服务器,它将传入连接的远程(客户机)地址向下传递到Layer堆栈(我使用ServiceBuilder构建的堆栈)。
我已经 * 尝试 * 使用例子from the hyper docs和来自Rust论坛的this example;然而,这两者都

  • 将数据向下传递给一个 * 单个 * 处理函数,而不是一个服务层堆栈
  • 我有一个Result<Response, Infallible>的返回类型,这是我 * 不 * 想要的(我希望能够在不返回响应的情况下断开连接)。

下面是我的一个尝试(我已经尝试了几种方法):

use std::{
    net::SocketAddr,
    time::Duration,
};

use hyper::{
    Body, Request, Response, Server,
    server::conn::AddrStream,
    service::{
        make_service_fn,
        service_fn,
    },
};
use tower::{
    Service, ServiceBuilder,
    timeout::TimeoutLayer,
};

async fn dummy_handle(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    let response_text = format!(
        "{:?} {} {}", req.version(), req.method(), req.uri()
    );
    let response = Response::new(Body::from(response_text));
    Ok(response)
}

# [tokio::main(flavor = "current_thread")]

async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = SocketAddr::from(([127, 0, 0, 1], 8080));

    // Dummy stack of service layers I want to wrap.
    let service = ServiceBuilder::new()
        .layer(TimeoutLayer::new(Duration::from_millis(1000 * 60)))
        .service_fn(dummy_handle);

    let make_svc = make_service_fn(|socket: &AddrStream| {
        let remote_addr = socket.remote_addr();
        let mut inner_svc = service.clone();
        let outer_svc = service_fn(move |mut req: Request<Body>| async {
            req.extensions_mut().insert(remote_addr);
            inner_svc.call(req)
        });

        async move { outer_svc }
    });

    Server::bind(&addr)
        .serve(make_svc)
        .await?;

    Ok(())
}

我完全理解在这里包含错误消息是有帮助的;然而,这是Rust编译器吐出 pages(或者至少是screenfully)的晦涩东西的情况之一,所以我将把自己限制在几个选择的例子中。
首先,我经常听到这样的话:

type mismatch resolving `<impl Future<Output = [async output]> as Future>::Output == Result<_, _>`

例如,在此之前:

39 |            let outer_svc = service_fn(move |mut req: Request<Body>| async {
   |  _____________________________________-___________________________________-
   | | ____________________________________|
   | ||
40 | ||             req.extensions_mut().insert(remote_addr);
41 | ||             inner_svc.call(req)
42 | ||         });
   | ||         -
   | ||_________|
   | |__________the expected closure
   |            the expected `async` block
...
48 |            .serve(make_svc)
   |             ----- ^^^^^^^^ expected struct `service::util::ServiceFn`, found enum `Result`
   |             |
   |             required by a bound introduced by this call

接下来的错误消息似乎完全矛盾:

[ several lines identical to above elided here ]

48  |            .serve(make_svc)
    |             ^^^^^ expected enum `Result`, found struct `service::util::ServiceFn`

我只是不知道编译器想要我做什么。

72qzrwbm

72qzrwbm1#

试试看:

use std::{net::SocketAddr, time::Duration, convert::Infallible};

use hyper::{
    server::conn::AddrStream,
    service::{make_service_fn, service_fn},
    Body, Request, Response, Server,
};
use tower::{Service, ServiceBuilder};

async fn dummy_handle(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    let response_text = format!("{:?} {} {}", req.version(), req.method(), req.uri());
    let response = Response::new(Body::from(response_text));
    Ok(response)
}

# [tokio::main(flavor = "current_thread")]

async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = SocketAddr::from(([127, 0, 0, 1], 8080));

    // Dummy stack of service layers I want to wrap.
    let service = ServiceBuilder::new()
        .timeout(Duration::from_millis(1000 * 60))
        .service_fn(dummy_handle);

    let make_svc = make_service_fn(|socket: &AddrStream| {
        let remote_addr = socket.remote_addr();
        let mut inner_svc = service.clone();
        let outer_svc = service_fn(move |mut req: Request<Body>| {
            req.extensions_mut().insert(remote_addr);
            inner_svc.call(req)
        });

        async { Ok::<_, Infallible>(outer_svc) }
    });

    Server::bind(&addr).serve(make_svc).await?;

    Ok(())
}

您正在返回一个future,该future返回另一个future

|| async {
            req.extensions_mut().insert(remote_addr);
            inner_svc.call(req)
}

这是一个Future<Output = Future<...>>
因此,你需要把你的结束语变成这样:

|| {
            req.extensions_mut().insert(remote_addr);
            inner_svc.call(req)
}
mepcadol

mepcadol2#

如果您不需要ServiceBuilder,则可以执行以下操作:

use std::{
    net::SocketAddr,
};
use std::convert::Infallible;

use hyper::{
    Body, Request, Response, Server,
    server::conn::AddrStream,
    service::{
        make_service_fn,
        service_fn,
    },
};

# [tokio::main]

async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>>
{
    let addr = SocketAddr::from(([127, 0, 0, 1], 8080));

    let make_svc = make_service_fn(|socket: &AddrStream| {
        let remote_addr = socket.remote_addr();
        async move {
            Ok::<_, Infallible>(service_fn(move |req: Request<Body>| async move {
                println!("remote_addr:  {:?}, request: {:?}", remote_addr, req);
                Ok::<_, Infallible>(
                    Response::new(Body::from(format!(
                        "{:?} {} {}", req.version(), req.method(), req.uri()
                    )))
                )
            }))
        }
    });

    let _ = Server::bind(&addr).serve(make_svc).await?;

    Ok(())
}

其他

use std::{
    net::SocketAddr,
    time::Duration,
    task::{Context, Poll},
    future::Future,
    pin::Pin
};

use hyper::{
    http,
    Body, Request, Response, Server
};

use tower::{
    timeout::TimeoutLayer,
    Service, ServiceBuilder,
};

# [derive(Debug)]

pub struct CustomService;

impl Service<Request<Body>> for CustomService {
    type Response = Response<Body>;
    type Error = http::Error;
    type Future = Pin<Box<dyn Future<Output=Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) }

    fn call(&mut self, req: Request<Body>) -> Self::Future {
        let rsp = Response::builder();
        let body = Body::from(format!("{:?} {} {}", req.version(), req.method(), req.uri()));
        let rsp = rsp.status(200).body(body).unwrap();
        Box::pin(async {Ok(rsp) })
    }
}

# [derive(Debug)]

pub struct MakeService;

impl<T> Service<T> for MakeService {
    type Response = CustomService;
    type Error = std::io::Error;
    type Future = Pin<Box<dyn Future<Output=Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Ok(()).into() }

    fn call(&mut self, _: T) -> Self::Future {
        Box::pin(async { Ok(CustomService) })
    }
}

# [tokio::main]

async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>>
{
    let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
    let service = ServiceBuilder::new()
    .layer(TimeoutLayer::new(Duration::from_secs(60)))
    .service(MakeService);
    let _ = Server::bind(&addr).serve(service).await?;
    Ok(())
}

相关问题