gpt4 book ai didi

rust - 你如何用 Tokio 简单地包装同步网络 I/O?

转载 作者:行者123 更新时间:2023-12-03 11:37:04 26 4
gpt4 key购买 nike

这个问题在这里已经有了答案:





What is the best approach to encapsulate blocking I/O in future-rs?

(1 个回答)


1年前关闭。




不幸的是,我对 Rust 并发开发的理解存在明显的失误。这个问题源于数周反复努力解决一个看似“微不足道”的问题。

问题域
开发一个名为 twistrs 的 Rust 库,它是一个域名排列和枚举库。该库的目的和目标是提供一个根域(例如 google.com )并生成该域的排列(例如 guugle.com )并丰富该排列(例如它解析为 123.123.123.123 )。
它的目标之一是比它的 Python counterpart 执行得更快。最值得注意的是,网络调用,例如但不限于 DNS 查找。
目前的设计方案
该库背后的想法(除了作为学习场所之外)是开发一个非常简单的安全库,可以实现以满足各种需求。您(作为客户端)可以选择在内部直接与 permutationenrichment 模块交互,或者使用库提供的异步/并发实现。
Twistrs proposed architecture
请注意,内部没有共享状态。这可能是非常低效的,但暂时有点没有意义,因为它可以防止很多问题。
当前问题
在内部,DNS 查找是同步完成的,并且本质上是阻塞的。我无法将其转换为并发代码。我能得到的最接近的是使用 tokio mpsc channel ,并执行 spawn 单个 tokio 任务:

use twistrs::enrich::{Result, DomainMetadata};
use twistrs::permutate::Domain;

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
let domain = Domain::new("google.com").unwrap();

let _permutations = domain.all().unwrap().collect::<Vec<String>>();

let (mut tx, mut rx) = mpsc::channel(1000);

tokio::spawn(async move {
for (i, v) in _permutations.into_iter().enumerate() {
let domain_metadata = DomainMetadata::new(v.clone());

let dns_resolution = domain_metadata.dns_resolvable();

if let Err(_) = tx.send((i, dns_resolution)).await {
println!("receiver dropped");
return;
}
}
});

while let Some(i) = rx.recv().await {
println!("got: {:?}", i);
}
}
也就是说,精明的读者会立即注意到这会阻塞,并有效地以任何方式同步运行 DNS 查找。
由于在 move 上完成了 tx (并且 tx 不是 impl Copy ),因此无法尝试在 for 循环中生成 Tokio 任务:
for (i, v) in _permutations.into_iter().enumerate() {
tokio::spawn(async move {
let domain_metadata = DomainMetadata::new(v.clone());

let dns_resolution = domain_metadata.dns_resolvable();

if let Err(_) = tx.send((i, dns_resolution)).await {
println!("receiver dropped");
return;
}
});
}
删除 await 当然不会导致任何事情发生,因为需要轮询生成的任务。我如何有效地将所有这些同步调用包装到异步任务中,这些任务可以独立运行并最终汇聚到一个集合中?
我遇到的一个类似的 Rust 项目是 batch_resolve ,它在这方面做得非常好(!)。但是,我发现实现对于我想要实现的目标来说异常复杂(也许我错了)。非常感谢任何帮助或实现这一目标的见解。
如果您想快速重现此问题,您可以简单地克隆项目并使用本文中的第一个代码片段更新 examples/twistrs-cli/main.rs

最佳答案

编辑:我误解了你的问题,没有意识到 DNS 解析本身不是异步的。以下方法实际上不适用于同步代码,并且只会由于阻塞代码而导致执行程序停止,但如果您切换到异步解析方法,我会保留它。我建议使用 tokio 的异步 lookup_host() 如果这符合您的需求。

异步执行器旨在处理大量并行任务,因此您可以尝试为每个请求生成一个新任务,使用 Semaphore 一次创建运行任务数量的上限。其代码可能如下所示:

let (mut tx, mut rx) = mpsc::channel(1000);
let semaphore = Arc::new(Semaphore::new(1000)); // allow up to 1000 tasks to run at once

for (i, v) in _permutations.into_iter().enumerate() {
let domain_metadata = DomainMetadata::new(v.clone());
let mut tx = tx.clone(); // every task will have its own copy of the sender
let permit = semaphore.acquire_owned().await; // wait until we have a permit

let dns_resolution = domain_metadata.dns_resolvable();
tokio::spawn(async move {
if let Err(_) = tx.send((i, dns_resolution)).await {
println!("receiver dropped");
return;
}
drop(permit); // explicitly release the permit, to make sure it was moved into this task
}); // note: task spawn results and handle dropped here
}

while let Some(i) = rx.recv().await {
println!("got: {:?}", i);
}
如果额外任务的开销被证明太显着,您可以尝试将这些任务组合成一个 future ,使用像 FuturesUnordered 这样的工具。来自 futures箱。这使您可以获取任意大的 future 列表,并在单个任务中重复轮询它们。

关于rust - 你如何用 Tokio 简单地包装同步网络 I/O?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63548625/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com