gpt4 book ai didi

rust - Rusoto 异步使用 FuturesOrdered 组合器

转载 作者:行者123 更新时间:2023-11-29 08:31:45 29 4
gpt4 key购买 nike

我正在尝试使用 FuturesOrdered 发送并行异步 Rusoto SQS 请求:

use futures::prelude::*; // 0.1.26
use futures::stream::futures_unordered::FuturesUnordered;
use rusoto_core::{Region, HttpClient}; // 0.38.0
use rusoto_credential::EnvironmentProvider; // 0.17.0
use rusoto_sqs::{SendMessageBatchRequest, SendMessageBatchRequestEntry, Sqs, SqsClient}; // 0.38.0

fn main() {
let client = SqsClient::new_with(
HttpClient::new().unwrap(),
EnvironmentProvider::default(),
Region::UsWest2,
);

let messages: Vec<u32> = (1..12).map(|n| n).collect();
let chunks: Vec<_> = messages.chunks(10).collect();

let tasks: FuturesUnordered<_> = chunks.into_iter().map(|c| {
let batch = create_batch(c);
client.send_message_batch(batch)
}).collect();

let tasks = tasks
.for_each(|t| {
println!("{:?}", t);
Ok(())
})
.map_err(|e| println!("{}", e));

tokio::run(tasks);
}

fn create_batch(ids: &[u32]) -> SendMessageBatchRequest {
let queue_url = "https://sqs.us-west-2.amazonaws.com/xxx/xxx".to_string();
let entries = ids
.iter()
.map(|id| SendMessageBatchRequestEntry {
id: id.to_string(),
message_body: id.to_string(),
..Default::default()
})
.collect();

SendMessageBatchRequest {
entries,
queue_url,
}
}

任务正确完成,但 tokio::run(tasks) 不会停止。我假设这是因为 tasks.for_each() 会强制它继续运行并寻找更多的 future ?

为什么 tokio::run(tasks) 不停止?我是否正确使用了 FuturesOrdered

我也有点担心创建多达 60,000 个 futures 以完成并将它们推送到 FuturesUnordered 组合器时的内存使用情况。

最佳答案

我发现是 main 函数中的 SqsClient 导致它阻塞,因为即使任务已完成,它仍在做一些家务。

其中一位 Rusoto 人提供的解决方案是将其添加到 tokio::run

之上
std::mem::drop(client);

关于rust - Rusoto 异步使用 FuturesOrdered 组合器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56022266/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com