rust 如何使用reqwest执行并行异步HTTP GET请求?

toiithl6  于 2023-03-18  发布在  其他
关注(0)|答案(2)|浏览(167)

The async example很有用,但作为Rust和时雄的新手,我正在努力研究如何同时执行N个请求,使用来自向量的URL,并为每个URL创建一个响应HTML的迭代器作为字符串。
这怎么可能呢?

zu0ti5jz

zu0ti5jz1#

并发请求

自2014年11月04日起:

use futures::{stream, StreamExt}; // 0.3.27
use reqwest::Client; // 0.11.14
use tokio; // 1.26.0, features = ["macros"]

const CONCURRENT_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = &client;
            async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            }
        })
        .buffer_unordered(CONCURRENT_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        })
        .await;
}
stream::iter(urls)

stream::iter
取一个字符串集合并将其转换为Stream

.map(|url| {

StreamExt::map
对流中的每个元素运行异步函数,并将元素转换为新类型。

let client = &client;
async move {

获取对Client的显式引用,并将该引用(不是原始的Client)移动到匿名异步块中。

let resp = client.get(url).send().await?;

使用Client的连接池启动异步GET请求并等待该请求。

resp.bytes().await

请求并等待响应的字节数。

.buffer_unordered(N);

StreamExt::buffer_unordered
将一个期货流转换为这些期货值的流,同时执行这些期货。

bodies
    .for_each(|b| {
        async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        }
    })
    .await;

StreamExt::for_each
将流转换回单个future,打印出沿着接收到的数据量,然后等待future完成。
另见:

无绑定执行

如果愿意,还可以将迭代器转换为future的迭代器,并使用future::join_all

use futures::future; // 0.3.4
use reqwest::Client; // 0.10.1
use tokio; // 0.2.11

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = future::join_all(urls.into_iter().map(|url| {
        let client = &client;
        async move {
            let resp = client.get(url).send().await?;
            resp.bytes().await
        }
    }))
    .await;

    for b in bodies {
        match b {
            Ok(b) => println!("Got {} bytes", b.len()),
            Err(e) => eprintln!("Got an error: {}", e),
        }
    }
}

我建议使用第一个示例,因为您通常希望限制并发性,bufferbuffer_unordered有助于实现这一点。

并行请求

并发请求通常就足够了,但是有时候你需要并行请求,在这种情况下,你需要生成一个任务。

use futures::{stream, StreamExt}; // 0.3.8
use reqwest::Client; // 0.10.9
use tokio; // 0.2.24, features = ["macros"]

const PARALLEL_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let urls = vec!["https://api.ipify.org"; 2];

    let client = Client::new();

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = client.clone();
            tokio::spawn(async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            })
        })
        .buffer_unordered(PARALLEL_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(Ok(b)) => println!("Got {} bytes", b.len()),
                Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
                Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
            }
        })
        .await;
}

主要区别是:

  • 我们使用tokio::spawn来执行独立的 * task * 工作。
  • 我们必须为每个任务分配自己的reqwest::Client,克隆一个共享客户端来使用连接池。
  • 当无法加入任务时,还会出现其他错误情况。

另见:

5gfr0r5j

5gfr0r5j2#

如果可能的话,我推荐使用std async和rayon。它们现在都很成熟,并且在std中的async{/* code here */}范围界限的情况下非常容易上手。您也可以使用时雄的特性集成https://docs.rs/async-std/1.10.0/async_std/#features

相关问题