gpt4 book ai didi

rust - 如何使用reqwest执行并行异步HTTP GET请求?

转载 作者:行者123 更新时间:2023-12-03 11:48:18 26 4
gpt4 key购买 nike

The async example很有用,但是对于Rust和Tokio来说是新手,我正在努力找出如何立即执行N个请求,使用向量中的URL并为每个URL创建响应HTML的迭代器作为字符串的方法。

怎么办呢?

最佳答案

并发请求
从reqwest 0.10开始:

use futures::{stream, StreamExt}; // 0.3.5
use reqwest::Client; // 0.10.6
use tokio; // 0.2.21, features = ["macros"]

const CONCURRENT_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
let client = Client::new();

let urls = vec!["https://api.ipify.org"; 2];

let bodies = stream::iter(urls)
.map(|url| {
let client = &client;
async move {
let resp = client.get(url).send().await?;
resp.bytes().await
}
})
.buffer_unordered(CONCURRENT_REQUESTS);

bodies
.for_each(|b| async {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
})
.await;
}

stream::iter(urls)

stream::iter
收集字符串并将其转换为 Stream
.map(|url| {

StreamExt::map
在流中的每个元素上运行一个异步函数,并将该元素转换为新类型。
let client = &client;
async move {

取得对 Client的明确引用,然后将引用(不是原始的 Client)移到匿名异步块中。
let resp = client.get(url).send().await?;

使用 Client的连接池启动异步GET请求,然后等待该请求。
resp.bytes().await

请求并等待响应的字节。
.buffer_unordered(N);

StreamExt::buffer_unordered
将 future 流转换为那些 future 值(value)流,同时执行 future 。
bodies
.for_each(|b| {
async {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
}
})
.await;

StreamExt::for_each
将流转换回单个将来,打印出沿途收到的数据量,然后等待将来完成。
也可以看看:
  • Join futures with limited concurrency
  • How to merge iterator of streams?
  • How do I synchronously return a value calculated in an asynchronous Future in stable Rust?
  • What is the difference between `then`, `and_then` and `or_else` in Rust futures?

  • 无限执行
    如果愿意,还可以将迭代器转换为 future 迭代器,并使用 future::join_all :
    use futures::future; // 0.3.4
    use reqwest::Client; // 0.10.1
    use tokio; // 0.2.11

    #[tokio::main]
    async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = future::join_all(urls.into_iter().map(|url| {
    let client = &client;
    async move {
    let resp = client.get(url).send().await?;
    resp.bytes().await
    }
    }))
    .await;

    for b in bodies {
    match b {
    Ok(b) => println!("Got {} bytes", b.len()),
    Err(e) => eprintln!("Got an error: {}", e),
    }
    }
    }
    我鼓励使用第一个示例,因为您通常想限制并发性, bufferbuffer_unordered可以帮助您。
    平行要求
    并发请求通常就足够了,但是有时候 需要并行请求。在这种情况下,您需要产生一个任务。
    use futures::{stream, StreamExt}; // 0.3.8
    use reqwest::Client; // 0.10.9
    use tokio; // 0.2.24, features = ["macros"]

    const PARALLEL_REQUESTS: usize = 2;

    #[tokio::main]
    async fn main() {
    let urls = vec!["https://api.ipify.org"; 2];

    let client = Client::new();

    let bodies = stream::iter(urls)
    .map(|url| {
    let client = client.clone();
    tokio::spawn(async move {
    let resp = client.get(url).send().await?;
    resp.bytes().await
    })
    })
    .buffer_unordered(PARALLEL_REQUESTS);

    bodies
    .for_each(|b| async {
    match b {
    Ok(Ok(b)) => println!("Got {} bytes", b.len()),
    Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
    Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
    }
    })
    .await;
    }
    主要区别在于:
  • 我们使用 tokio::spawn 在单独的任务中执行工作。
  • 我们必须为每个任务分配自己的reqwest::Client。作为recommended,我们克隆了一个共享客户端以使用连接池。
  • 当无法加入任务时,还有另外一种错误情况。

  • 也可以看看:
  • What is the difference between concurrent programming and parallel programming?
  • What is the difference between concurrency and parallelism?
  • What is the difference between concurrency, parallelism and asynchronous methods?
  • 关于rust - 如何使用reqwest执行并行异步HTTP GET请求?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62790008/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com